This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9376d8d6e1 Refactor: Move `UpdateCoordinatorStateAndPrepareCluster` 
duty out of `DruidCoordinator` (#14845)
9376d8d6e1 is described below

commit 9376d8d6e1c952a57aef1aa6e863547f95c338a7
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Aug 22 19:50:41 2023 +0530

    Refactor: Move `UpdateCoordinatorStateAndPrepareCluster` duty out of 
`DruidCoordinator` (#14845)
    
    Motivation:
    - Clean up `DruidCoordinator` and move methods to classes where they are 
most relevant
    
    Changes:
    - No functional change
    - Add duty `PrepareBalancerAndLoadQueues` to replace 
`UpdateCoordinatorState`
    - Move map of `LoadQueuePeon` from `DruidCoordinator` to 
`LoadQueueTaskMaster`
    - Make `BalancerStrategyFactory` an abstract class and keep the balancer 
executor here
    - Move reporting of used segment stats and historical capacity stats from
    `CollectSegmentAndServerStats` to `PrepareBalancerAndLoadQueues`
    - Move reporting of unavailable and under-replicated segment stats from
    `CollectSegmentAndServerStats` to `UpdateReplicationStatus` duty
---
 .../druid/server/coordinator/DruidCoordinator.java | 253 +++------------------
 .../coordinator/DruidCoordinatorRuntimeParams.java |   2 +-
 .../balancer/BalancerStrategyFactory.java          |  45 +++-
 .../CachingCostBalancerStrategyFactory.java        |   5 +-
 .../balancer/CostBalancerStrategyFactory.java      |   8 +-
 ...DisabledCachingCostBalancerStrategyFactory.java |   7 +-
 .../DiskNormalizedCostBalancerStrategyFactory.java |   8 +-
 .../balancer/RandomBalancerStrategyFactory.java    |   6 +-
 .../duty/CollectSegmentAndServerStats.java         |  70 ++----
 .../duty/PrepareBalancerAndLoadQueues.java         | 196 ++++++++++++++++
 .../coordinator/loading/LoadQueueTaskMaster.java   |  80 ++++++-
 .../server/coordinator/DruidCoordinatorTest.java   |  71 +-----
 .../balancer/BalancerStrategyFactoryTest.java      |  45 ++--
 .../coordinator/duty/BalanceSegmentsTest.java      |   4 +-
 .../duty/CollectSegmentAndServerStatsTest.java     |  22 +-
 .../coordinator/duty/CompactSegmentsTest.java      |   2 +-
 .../duty/MarkOvershadowedSegmentsAsUnusedTest.java |   2 +-
 17 files changed, 437 insertions(+), 389 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 832f7790ad..faa399ba73 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -23,8 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
@@ -35,7 +33,6 @@ import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
-import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.client.ServerInventoryView;
 import org.apache.druid.client.coordinator.Coordinator;
 import org.apache.druid.common.config.JacksonConfigManager;
@@ -47,7 +44,6 @@ import 
org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Stopwatch;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.guava.Comparators;
@@ -60,7 +56,6 @@ import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
 import 
org.apache.druid.server.coordinator.compact.CompactionSegmentSearchPolicy;
 import org.apache.druid.server.coordinator.duty.BalanceSegments;
@@ -70,12 +65,12 @@ import 
org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
 import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
 import 
org.apache.druid.server.coordinator.duty.MarkOvershadowedSegmentsAsUnused;
+import org.apache.druid.server.coordinator.duty.PrepareBalancerAndLoadQueues;
 import org.apache.druid.server.coordinator.duty.RunRules;
 import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
 import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
-import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
 import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
 import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -98,7 +93,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -144,7 +138,6 @@ public class DruidCoordinator
   private final ScheduledExecutorFactory executorFactory;
   private final Map<String, ScheduledExecutorService> dutyGroupExecutors = new 
HashMap<>();
   private final LoadQueueTaskMaster taskMaster;
-  private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = 
new ConcurrentHashMap<>();
   private final SegmentLoadQueueManager loadQueueManager;
   private final ServiceAnnouncer serviceAnnouncer;
   private final DruidNode self;
@@ -171,9 +164,6 @@ public class DruidCoordinator
    */
   private volatile SegmentReplicationStatus segmentReplicationStatus = null;
 
-  private int cachedBalancerThreadNumber;
-  private ListeningExecutorService balancerExec;
-
   public static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = 
"HistoricalManagementDuties";
   private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = 
"MetadataStoreManagementDuties";
   private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = 
"IndexingServiceDuties";
@@ -233,7 +223,7 @@ public class DruidCoordinator
 
   public Map<String, LoadQueuePeon> getLoadManagementPeons()
   {
-    return loadManagementPeons;
+    return taskMaster.getAllPeons();
   }
 
   public Map<String, Object2LongMap<String>> 
getTierToDatasourceToUnderReplicatedCount(boolean useClusterView)
@@ -344,18 +334,6 @@ public class DruidCoordinator
     return coordLeaderSelector.getCurrentLeader();
   }
 
-  @VisibleForTesting
-  public int getCachedBalancerThreadNumber()
-  {
-    return cachedBalancerThreadNumber;
-  }
-
-  @VisibleForTesting
-  public ListeningExecutorService getBalancerExec()
-  {
-    return balancerExec;
-  }
-
   @LifecycleStart
   public void start()
   {
@@ -397,10 +375,7 @@ public class DruidCoordinator
       started = false;
 
       stopAllDutyGroupExecutors();
-
-      if (balancerExec != null) {
-        balancerExec.shutdownNow();
-      }
+      balancerStrategyFactory.stopExecutor();
     }
   }
 
@@ -440,6 +415,7 @@ public class DruidCoordinator
           config.getCoordinatorStartDelay()
       );
 
+      taskMaster.onLeaderStart();
       segmentsMetadataManager.startPollingDatabasePeriodically();
       segmentsMetadataManager.populateUsedFlagLastUpdatedAsync();
       metadataRuleManager.start();
@@ -524,22 +500,13 @@ public class DruidCoordinator
 
       log.info("I am no longer the leader...");
 
-      for (String server : loadManagementPeons.keySet()) {
-        LoadQueuePeon peon = loadManagementPeons.remove(server);
-        peon.stop();
-      }
-      loadManagementPeons.clear();
-
+      taskMaster.onLeaderStop();
       serviceAnnouncer.unannounce(self);
       lookupCoordinatorManager.stop();
       metadataRuleManager.stop();
       segmentsMetadataManager.stopPollingDatabasePeriodically();
       segmentsMetadataManager.stopAsyncUsedFlagLastUpdatedUpdate();
-
-      if (balancerExec != null) {
-        balancerExec.shutdownNow();
-        balancerExec = null;
-      }
+      balancerStrategyFactory.stopExecutor();
     }
   }
 
@@ -559,53 +526,21 @@ public class DruidCoordinator
     dutyGroupExecutors.clear();
   }
 
-  /**
-   * Resets the balancerExec if required and creates a new BalancerStrategy for
-   * the current coordinator run.
-   */
-  @VisibleForTesting
-  BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
-  {
-    // Reset balancerExecutor if required
-    if (balancerExec == null) {
-      balancerExec = createNewBalancerExecutor(balancerComputeThreads);
-    } else if (cachedBalancerThreadNumber != balancerComputeThreads) {
-      log.info(
-          "'balancerComputeThreads' has changed from [%d] to [%d]",
-          cachedBalancerThreadNumber, balancerComputeThreads
-      );
-      balancerExec.shutdownNow();
-      balancerExec = createNewBalancerExecutor(balancerComputeThreads);
-    }
-
-    // Create BalancerStrategy
-    final BalancerStrategy balancerStrategy = 
balancerStrategyFactory.createBalancerStrategy(balancerExec);
-    log.info(
-        "Using balancer strategy[%s] with [%d] threads.",
-        balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
-    );
-    return balancerStrategy;
-  }
-
-  private ListeningExecutorService createNewBalancerExecutor(int numThreads)
-  {
-    log.info("Creating new balancer executor with [%d] threads.", numThreads);
-    cachedBalancerThreadNumber = numThreads;
-    return MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
-    );
-  }
-
   private List<CoordinatorDuty> makeHistoricalManagementDuties()
   {
     return ImmutableList.of(
-        new UpdateCoordinatorStateAndPrepareCluster(),
+        new PrepareBalancerAndLoadQueues(
+            taskMaster,
+            loadQueueManager,
+            balancerStrategyFactory,
+            serverInventoryView
+        ),
         new RunRules(segmentsMetadataManager::markSegmentsAsUnused),
         new UpdateReplicationStatus(),
         new UnloadUnusedSegments(loadQueueManager),
         new 
MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
         new BalanceSegments(config.getCoordinatorPeriod()),
-        new CollectSegmentAndServerStats(DruidCoordinator.this)
+        new CollectSegmentAndServerStats(taskMaster)
     );
   }
 
@@ -724,7 +659,7 @@ public class DruidCoordinator
             DruidCoordinatorRuntimeParams
                 .newBuilder(coordinatorStartTime)
                 .withDatabaseRuleManager(metadataRuleManager)
-                
.withSnapshotOfDataSourcesWithAllUsedSegments(dataSourcesSnapshot)
+                .withDataSourcesSnapshot(dataSourcesSnapshot)
                 .withDynamicConfigs(getDynamicConfigs())
                 .withCompactionConfig(getCompactionConfig())
                 .build();
@@ -816,149 +751,6 @@ public class DruidCoordinator
     }
   }
 
-  /**
-   * This duty does the following:
-   * <ul>
-   *   <li>Prepares an immutable {@link DruidCluster} consisting of {@link 
ServerHolder}s
-   *   which represent the current state of the servers in the cluster.</li>
-   *   <li>Starts and stops load peons for new and disappeared servers 
respectively.</li>
-   *   <li>Cancels in-progress loads on all decommissioning servers. This is 
done
-   *   here to ensure that under-replicated segments are assigned to active 
servers
-   *   in the {@link RunRules} duty after this.</li>
-   *   <li>Initializes the {@link BalancerStrategy} for the run.</li>
-   * </ul>
-   *
-   * @see #makeHistoricalManagementDuties() for the order of duties
-   */
-  private class UpdateCoordinatorStateAndPrepareCluster implements 
CoordinatorDuty
-  {
-    @Nullable
-    @Override
-    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
-    {
-      List<ImmutableDruidServer> currentServers = prepareCurrentServers();
-
-      startPeonsForNewServers(currentServers);
-      stopPeonsForDisappearedServers(currentServers);
-
-      final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
-      final SegmentLoadingConfig segmentLoadingConfig = 
params.getSegmentLoadingConfig();
-
-      final DruidCluster cluster = prepareCluster(dynamicConfig, 
segmentLoadingConfig, currentServers);
-      cancelLoadsOnDecommissioningServers(cluster);
-
-      final BalancerStrategy balancerStrategy
-          = 
createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
-      return params.buildFromExisting()
-                   .withDruidCluster(cluster)
-                   .withBalancerStrategy(balancerStrategy)
-                   .withSegmentAssignerUsing(loadQueueManager)
-                   .build();
-    }
-
-    /**
-     * Cancels all load/move operations on decommissioning servers. This should
-     * be done before initializing the SegmentReplicantLookup so that
-     * under-replicated segments can be assigned in the current run itself.
-     */
-    private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
-    {
-      final AtomicInteger cancelledCount = new AtomicInteger(0);
-      final List<ServerHolder> decommissioningServers
-          = cluster.getAllServers().stream()
-                   .filter(ServerHolder::isDecommissioning)
-                   .collect(Collectors.toList());
-
-      for (ServerHolder server : decommissioningServers) {
-        server.getQueuedSegments().forEach(
-            (segment, action) -> {
-              // Cancel the operation if it is a type of load
-              if (action.isLoad() && server.cancelOperation(action, segment)) {
-                cancelledCount.incrementAndGet();
-              }
-            }
-        );
-      }
-
-      if (cancelledCount.get() > 0) {
-        log.info(
-            "Cancelled [%d] load/move operations on [%d] decommissioning 
servers.",
-            cancelledCount.get(), decommissioningServers.size()
-        );
-      }
-    }
-
-    List<ImmutableDruidServer> prepareCurrentServers()
-    {
-      List<ImmutableDruidServer> currentServers = serverInventoryView
-          .getInventory()
-          .stream()
-          .filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
-          .map(DruidServer::toImmutableDruidServer)
-          .collect(Collectors.toList());
-
-      if (log.isDebugEnabled()) {
-        // Display info about all segment-replicatable (historical and bridge) 
servers
-        log.debug("Servers");
-        for (ImmutableDruidServer druidServer : currentServers) {
-          log.debug("  %s", druidServer);
-          log.debug("    -- DataSources");
-          for (ImmutableDruidDataSource druidDataSource : 
druidServer.getDataSources()) {
-            log.debug("    %s", druidDataSource);
-          }
-        }
-      }
-      return currentServers;
-    }
-
-    void startPeonsForNewServers(List<ImmutableDruidServer> currentServers)
-    {
-      for (ImmutableDruidServer server : currentServers) {
-        loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
-          LoadQueuePeon loadQueuePeon = taskMaster.giveMePeon(server);
-          loadQueuePeon.start();
-          log.debug("Created LoadQueuePeon for server[%s].", server.getName());
-          return loadQueuePeon;
-        });
-      }
-    }
-
-    DruidCluster prepareCluster(
-        CoordinatorDynamicConfig dynamicConfig,
-        SegmentLoadingConfig segmentLoadingConfig,
-        List<ImmutableDruidServer> currentServers
-    )
-    {
-      final Set<String> decommissioningServers = 
dynamicConfig.getDecommissioningNodes();
-      final DruidCluster.Builder cluster = DruidCluster.builder();
-      for (ImmutableDruidServer server : currentServers) {
-        cluster.add(
-            new ServerHolder(
-                server,
-                loadManagementPeons.get(server.getName()),
-                decommissioningServers.contains(server.getHost()),
-                segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
-                segmentLoadingConfig.getMaxLifetimeInLoadQueue()
-            )
-        );
-      }
-      return cluster.build();
-    }
-
-    void stopPeonsForDisappearedServers(List<ImmutableDruidServer> servers)
-    {
-      final Set<String> disappeared = 
Sets.newHashSet(loadManagementPeons.keySet());
-      for (ImmutableDruidServer server : servers) {
-        disappeared.remove(server.getName());
-      }
-      for (String name : disappeared) {
-        log.debug("Removing listener for server[%s] which is no longer 
there.", name);
-        LoadQueuePeon peon = loadManagementPeons.remove(name);
-        peon.stop();
-      }
-    }
-  }
-
   /**
    * Updates replication status of all used segments. This duty must run after
    * {@link RunRules} so that the number of required replicas for all segments
@@ -971,6 +763,23 @@ public class DruidCoordinator
     public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
     {
       segmentReplicationStatus = params.getSegmentReplicationStatus();
+
+      // Collect stats for unavailable and under-replicated segments
+      final CoordinatorRunStats stats = params.getCoordinatorStats();
+      getDatasourceToUnavailableSegmentCount().forEach(
+          (dataSource, numUnavailable) -> stats.add(
+              Stats.Segments.UNAVAILABLE,
+              RowKey.of(Dimension.DATASOURCE, dataSource),
+              numUnavailable
+          )
+      );
+      getTierToDatasourceToUnderReplicatedCount(false).forEach(
+          (tier, countsPerDatasource) -> countsPerDatasource.forEach(
+              (dataSource, underReplicatedCount) ->
+                  stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, 
tier, dataSource, underReplicatedCount)
+          )
+      );
+
       return params;
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 79173683fa..07a7bf1b2e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -332,7 +332,7 @@ public class DruidCoordinatorRuntimeParams
       return this;
     }
 
-    public Builder 
withSnapshotOfDataSourcesWithAllUsedSegments(DataSourcesSnapshot snapshot)
+    public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
     {
       this.usedSegments = 
createUsedSegmentsSet(snapshot.iterateAllUsedSegmentsInSnapshot());
       this.dataSourcesSnapshot = snapshot;
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
index d0d426f7b5..0dd05cb1b5 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactory.java
@@ -22,6 +22,9 @@ package org.apache.druid.server.coordinator.balancer;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
 
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "strategy", defaultImpl = 
CostBalancerStrategyFactory.class)
 @JsonSubTypes(value = {
@@ -30,7 +33,45 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
     @JsonSubTypes.Type(name = "diskNormalized", value = 
DiskNormalizedCostBalancerStrategyFactory.class),
     @JsonSubTypes.Type(name = "random", value = 
RandomBalancerStrategyFactory.class)
 })
-public interface BalancerStrategyFactory
+public abstract class BalancerStrategyFactory
 {
-  BalancerStrategy createBalancerStrategy(ListeningExecutorService exec);
+  private static final Logger log = new Logger(BalancerStrategyFactory.class);
+
+  public abstract BalancerStrategy createBalancerStrategy(int 
numBalancerThreads);
+
+  private int cachedBalancerThreadNumber;
+  private ListeningExecutorService balancerExec;
+
+  public void stopExecutor()
+  {
+    if (balancerExec != null) {
+      balancerExec.shutdownNow();
+      balancerExec = null;
+    }
+  }
+
+  protected ListeningExecutorService getOrCreateBalancerExecutor(int 
balancerComputeThreads)
+  {
+    if (balancerExec == null) {
+      balancerExec = createNewBalancerExecutor(balancerComputeThreads);
+    } else if (cachedBalancerThreadNumber != balancerComputeThreads) {
+      log.info(
+          "'balancerComputeThreads' has changed from [%d] to [%d].",
+          cachedBalancerThreadNumber, balancerComputeThreads
+      );
+      balancerExec.shutdownNow();
+      balancerExec = createNewBalancerExecutor(balancerComputeThreads);
+    }
+
+    return balancerExec;
+  }
+
+  private ListeningExecutorService createNewBalancerExecutor(int numThreads)
+  {
+    log.info("Creating new balancer executor with [%d] threads.", numThreads);
+    cachedBalancerThreadNumber = numThreads;
+    return MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
+    );
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
index 89b7cf7546..8c78bff7b6 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
@@ -44,7 +44,7 @@ import java.util.concurrent.RejectedExecutionException;
  * and will be removed in future releases.
  */
 @Deprecated
-public class CachingCostBalancerStrategyFactory implements 
BalancerStrategyFactory
+public class CachingCostBalancerStrategyFactory extends BalancerStrategyFactory
 {
   private static final EmittingLogger LOG = new 
EmittingLogger(CachingCostBalancerStrategyFactory.class);
 
@@ -128,8 +128,9 @@ public class CachingCostBalancerStrategyFactory implements 
BalancerStrategyFacto
   }
 
   @Override
-  public BalancerStrategy createBalancerStrategy(final 
ListeningExecutorService exec)
+  public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
   {
+    final ListeningExecutorService exec = 
getOrCreateBalancerExecutor(numBalancerThreads);
     LOG.warn(
         "'cachingCost' balancer strategy has been deprecated as it can lead to"
         + " unbalanced clusters. Use 'cost' strategy instead."
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
index 10d5952390..6894123c57 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyFactory.java
@@ -19,13 +19,11 @@
 
 package org.apache.druid.server.coordinator.balancer;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class CostBalancerStrategyFactory implements BalancerStrategyFactory
+public class CostBalancerStrategyFactory extends BalancerStrategyFactory
 {
   @Override
-  public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+  public CostBalancerStrategy createBalancerStrategy(int numBalancerThreads)
   {
-    return new CostBalancerStrategy(exec);
+    return new 
CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
index 7d2f0d96bc..d1881d477c 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DisabledCachingCostBalancerStrategyFactory.java
@@ -19,17 +19,16 @@
 
 package org.apache.druid.server.coordinator.balancer;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.java.util.common.logger.Logger;
 
-public class DisabledCachingCostBalancerStrategyFactory implements 
BalancerStrategyFactory
+public class DisabledCachingCostBalancerStrategyFactory extends 
BalancerStrategyFactory
 {
   private static final Logger log = new Logger(BalancerStrategyFactory.class);
 
   @Override
-  public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+  public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
   {
     log.warn("Balancer strategy 'cachingCost' is disabled. Using 'cost' 
strategy instead.");
-    return new CostBalancerStrategy(exec);
+    return new 
CostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
index 9c404b97e5..3389f6732e 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategyFactory.java
@@ -19,13 +19,11 @@
 
 package org.apache.druid.server.coordinator.balancer;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class DiskNormalizedCostBalancerStrategyFactory implements 
BalancerStrategyFactory
+public class DiskNormalizedCostBalancerStrategyFactory extends 
BalancerStrategyFactory
 {
   @Override
-  public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+  public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
   {
-    return new DiskNormalizedCostBalancerStrategy(exec);
+    return new 
DiskNormalizedCostBalancerStrategy(getOrCreateBalancerExecutor(numBalancerThreads));
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
index 2655df5338..6b97dee713 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategyFactory.java
@@ -19,12 +19,10 @@
 
 package org.apache.druid.server.coordinator.balancer;
 
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-public class RandomBalancerStrategyFactory implements BalancerStrategyFactory
+public class RandomBalancerStrategyFactory extends BalancerStrategyFactory
 {
   @Override
-  public BalancerStrategy createBalancerStrategy(ListeningExecutorService exec)
+  public BalancerStrategy createBalancerStrategy(int numBalancerThreads)
   {
     return new RandomBalancerStrategy();
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
index 8c5acaeebb..da2f1e1a04 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
@@ -20,18 +20,18 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.DruidCluster;
-import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.loading.LoadQueuePeon;
+import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Dimension;
 import org.apache.druid.server.coordinator.stats.RowKey;
 import org.apache.druid.server.coordinator.stats.Stats;
-import org.apache.druid.timeline.DataSegment;
 
 import java.util.Map;
 import java.util.Set;
@@ -45,11 +45,11 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
 {
   private static final Logger log = new 
Logger(CollectSegmentAndServerStats.class);
 
-  private final DruidCoordinator coordinator;
+  private final LoadQueueTaskMaster taskMaster;
 
-  public CollectSegmentAndServerStats(DruidCoordinator coordinator)
+  public CollectSegmentAndServerStats(LoadQueueTaskMaster taskMaster)
   {
-    this.coordinator = coordinator;
+    this.taskMaster = taskMaster;
   }
 
   @Override
@@ -57,25 +57,15 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
   {
     params.getDruidCluster().getHistoricals()
           .forEach(this::logHistoricalTierStats);
-    collectSegmentStats(params);
+    logServerDebuggingInfo(params.getDruidCluster());
+    collectLoadQueueStats(params.getCoordinatorStats());
 
     return params;
   }
 
-  private void collectSegmentStats(DruidCoordinatorRuntimeParams params)
+  private void collectLoadQueueStats(CoordinatorRunStats stats)
   {
-    final CoordinatorRunStats stats = params.getCoordinatorStats();
-
-    final DruidCluster cluster = params.getDruidCluster();
-    cluster.getHistoricals().forEach((tier, historicals) -> {
-      final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
-      stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
-      long totalCapacity = 
historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum);
-      stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
-    });
-
-    // Collect load queue stats
-    coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> {
+    taskMaster.getAllPeons().forEach((serverName, queuePeon) -> {
       final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
       stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey, 
queuePeon.getSizeOfSegmentsToLoad());
       stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey, 
queuePeon.getSegmentsToLoad().size());
@@ -86,33 +76,6 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
               stats.add(stat, createRowKeyForServer(serverName, 
key.getValues()), statValue)
       );
     });
-
-    coordinator.getDatasourceToUnavailableSegmentCount().forEach(
-        (dataSource, numUnavailable) -> stats.add(
-            Stats.Segments.UNAVAILABLE,
-            RowKey.of(Dimension.DATASOURCE, dataSource),
-            numUnavailable
-        )
-    );
-
-    coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach(
-        (tier, countsPerDatasource) -> countsPerDatasource.forEach(
-            (dataSource, underReplicatedCount) ->
-                stats.addToSegmentStat(Stats.Segments.UNDER_REPLICATED, tier, 
dataSource, underReplicatedCount)
-        )
-    );
-
-    // Collect total segment stats
-    params.getUsedSegmentsTimelinesPerDataSource().forEach(
-        (dataSource, timeline) -> {
-          long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
-                                                 
.mapToLong(DataSegment::getSize).sum();
-
-          RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
-          stats.add(Stats.Segments.USED_BYTES, datasourceKey, 
totalSizeOfUsedSegments);
-          stats.add(Stats.Segments.USED, datasourceKey, 
timeline.getNumObjects());
-        }
-    );
   }
 
   private RowKey createRowKeyForServer(String serverName, Map<Dimension, 
String> dimensionValues)
@@ -151,4 +114,19 @@ public class CollectSegmentAndServerStats implements 
CoordinatorDuty
     );
   }
 
+  private void logServerDebuggingInfo(DruidCluster cluster)
+  {
+    if (log.isDebugEnabled()) {
+      log.debug("Servers");
+      for (ServerHolder serverHolder : cluster.getAllServers()) {
+        ImmutableDruidServer druidServer = serverHolder.getServer();
+        log.debug("  %s", druidServer);
+        log.debug("    -- DataSources");
+        for (ImmutableDruidDataSource druidDataSource : 
druidServer.getDataSources()) {
+          log.debug("    %s", druidDataSource);
+        }
+      }
+    }
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
new file mode 100644
index 0000000000..197adce91e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/PrepareBalancerAndLoadQueues.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.client.ServerInventoryView;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * This duty does the following:
+ * <ul>
+ *   <li>Creates an immutable {@link DruidCluster} consisting of {@link 
ServerHolder}s
+ *   which represent the current state of the servers in the cluster.</li>
+ *   <li>Starts and stops load peons for new and disappeared servers 
respectively.</li>
+ *   <li>Cancels in-progress loads on all decommissioning servers. This is done
+ *   here to ensure that under-replicated segments are assigned to active 
servers
+ *   in the {@link RunRules} duty after this.</li>
+ *   <li>Initializes the {@link BalancerStrategy} for the run.</li>
+ * </ul>
+ */
+public class PrepareBalancerAndLoadQueues implements CoordinatorDuty
+{
+  private static final Logger log = new 
Logger(PrepareBalancerAndLoadQueues.class);
+
+  private final LoadQueueTaskMaster taskMaster;
+  private final SegmentLoadQueueManager loadQueueManager;
+  private final ServerInventoryView serverInventoryView;
+  private final BalancerStrategyFactory balancerStrategyFactory;
+
+  public PrepareBalancerAndLoadQueues(
+      LoadQueueTaskMaster taskMaster,
+      SegmentLoadQueueManager loadQueueManager,
+      BalancerStrategyFactory balancerStrategyFactory,
+      ServerInventoryView serverInventoryView
+  )
+  {
+    this.taskMaster = taskMaster;
+    this.loadQueueManager = loadQueueManager;
+    this.balancerStrategyFactory = balancerStrategyFactory;
+    this.serverInventoryView = serverInventoryView;
+  }
+
+  @Override
+  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
+  {
+    List<ImmutableDruidServer> currentServers = prepareCurrentServers();
+    taskMaster.resetPeonsForNewServers(currentServers);
+
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    final SegmentLoadingConfig segmentLoadingConfig = 
params.getSegmentLoadingConfig();
+
+    final DruidCluster cluster = prepareCluster(dynamicConfig, 
segmentLoadingConfig, currentServers);
+    cancelLoadsOnDecommissioningServers(cluster);
+
+    final CoordinatorRunStats stats = params.getCoordinatorStats();
+    collectHistoricalStats(cluster, stats);
+    collectUsedSegmentStats(params, stats);
+
+    int numBalancerThreads = 
params.getCoordinatorDynamicConfig().getBalancerComputeThreads();
+    final BalancerStrategy balancerStrategy = 
balancerStrategyFactory.createBalancerStrategy(numBalancerThreads);
+    log.info(
+        "Using balancer strategy [%s] with [%d] threads.",
+        balancerStrategy.getClass().getSimpleName(), numBalancerThreads
+    );
+
+    return params.buildFromExisting()
+                 .withDruidCluster(cluster)
+                 .withBalancerStrategy(balancerStrategy)
+                 .withSegmentAssignerUsing(loadQueueManager)
+                 .build();
+  }
+
+  /**
+   * Cancels all load/move operations on decommissioning servers. This should
+   * be done before initializing the SegmentReplicantLookup so that
+   * under-replicated segments can be assigned in the current run itself.
+   */
+  private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
+  {
+    final AtomicInteger cancelledCount = new AtomicInteger(0);
+    final List<ServerHolder> decommissioningServers
+        = cluster.getAllServers().stream()
+                 .filter(ServerHolder::isDecommissioning)
+                 .collect(Collectors.toList());
+
+    for (ServerHolder server : decommissioningServers) {
+      server.getQueuedSegments().forEach(
+          (segment, action) -> {
+            // Cancel the operation if it is a type of load
+            if (action.isLoad() && server.cancelOperation(action, segment)) {
+              cancelledCount.incrementAndGet();
+            }
+          }
+      );
+    }
+
+    if (cancelledCount.get() > 0) {
+      log.info(
+          "Cancelled [%d] load/move operations on [%d] decommissioning 
servers.",
+          cancelledCount.get(), decommissioningServers.size()
+      );
+    }
+  }
+
+  private List<ImmutableDruidServer> prepareCurrentServers()
+  {
+    return serverInventoryView
+        .getInventory()
+        .stream()
+        .filter(DruidServer::isSegmentReplicationOrBroadcastTarget)
+        .map(DruidServer::toImmutableDruidServer)
+        .collect(Collectors.toList());
+  }
+
+  private DruidCluster prepareCluster(
+      CoordinatorDynamicConfig dynamicConfig,
+      SegmentLoadingConfig segmentLoadingConfig,
+      List<ImmutableDruidServer> currentServers
+  )
+  {
+    final Set<String> decommissioningServers = 
dynamicConfig.getDecommissioningNodes();
+    final DruidCluster.Builder cluster = DruidCluster.builder();
+    for (ImmutableDruidServer server : currentServers) {
+      cluster.add(
+          new ServerHolder(
+              server,
+              taskMaster.getPeonForServer(server),
+              decommissioningServers.contains(server.getHost()),
+              segmentLoadingConfig.getMaxSegmentsInLoadQueue(),
+              segmentLoadingConfig.getMaxLifetimeInLoadQueue()
+          )
+      );
+    }
+    return cluster.build();
+  }
+
+  private void collectHistoricalStats(DruidCluster cluster, 
CoordinatorRunStats stats)
+  {
+    cluster.getHistoricals().forEach((tier, historicals) -> {
+      RowKey rowKey = RowKey.of(Dimension.TIER, tier);
+      stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
+
+      long totalCapacity = 
historicals.stream().mapToLong(ServerHolder::getMaxSize).sum();
+      stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
+    });
+  }
+
+  private void collectUsedSegmentStats(DruidCoordinatorRuntimeParams params, 
CoordinatorRunStats stats)
+  {
+    params.getUsedSegmentsTimelinesPerDataSource().forEach((dataSource, 
timeline) -> {
+      long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
+                                             
.mapToLong(DataSegment::getSize).sum();
+
+      RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
+      stats.add(Stats.Segments.USED_BYTES, datasourceKey, 
totalSizeOfUsedSegments);
+      stats.add(Stats.Segments.USED, datasourceKey, timeline.getNumObjects());
+    });
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
index bc68f87bc5..7a97a61fa3 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/LoadQueueTaskMaster.java
@@ -20,22 +20,32 @@
 package org.apache.druid.server.coordinator.loading;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Provider;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.client.ImmutableDruidServer;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Provides LoadQueuePeons
  */
 public class LoadQueueTaskMaster
 {
+  private static final Logger log = new Logger(LoadQueueTaskMaster.class);
+
   private final Provider<CuratorFramework> curatorFrameworkProvider;
   private final ObjectMapper jsonMapper;
   private final ScheduledExecutorService peonExec;
@@ -45,6 +55,11 @@ public class LoadQueueTaskMaster
   private final ZkPathsConfig zkPaths;
   private final boolean httpLoading;
 
+  @GuardedBy("this")
+  private final AtomicBoolean isLeader = new AtomicBoolean(false);
+
+  private final ConcurrentHashMap<String, LoadQueuePeon> loadManagementPeons = 
new ConcurrentHashMap<>();
+
   public LoadQueueTaskMaster(
       Provider<CuratorFramework> curatorFrameworkProvider,
       ObjectMapper jsonMapper,
@@ -65,7 +80,7 @@ public class LoadQueueTaskMaster
     this.httpLoading = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
   }
 
-  public LoadQueuePeon giveMePeon(ImmutableDruidServer server)
+  private LoadQueuePeon createPeon(ImmutableDruidServer server)
   {
     if (httpLoading) {
       return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, 
config, peonExec, callbackExec);
@@ -81,6 +96,69 @@ public class LoadQueueTaskMaster
     }
   }
 
+  public Map<String, LoadQueuePeon> getAllPeons()
+  {
+    return loadManagementPeons;
+  }
+
+  public LoadQueuePeon getPeonForServer(ImmutableDruidServer server)
+  {
+    return loadManagementPeons.get(server.getName());
+  }
+
+  /**
+   * Creates a peon for each of the given servers, if it doesn't already exist 
and
+   * removes peons for servers not present in the cluster anymore.
+   * <p>
+   * This method must not run concurrently with {@link #onLeaderStart()} and
+   * {@link #onLeaderStop()} so that there are no stray peons if the 
Coordinator
+   * is not leader anymore.
+   */
+  public synchronized void resetPeonsForNewServers(List<ImmutableDruidServer> 
currentServers)
+  {
+    if (!isLeader.get()) {
+      return;
+    }
+
+    final Set<String> oldServers = 
Sets.newHashSet(loadManagementPeons.keySet());
+
+    // Start peons for new servers
+    for (ImmutableDruidServer server : currentServers) {
+      loadManagementPeons.computeIfAbsent(server.getName(), serverName -> {
+        LoadQueuePeon loadQueuePeon = createPeon(server);
+        loadQueuePeon.start();
+        log.debug("Created LoadQueuePeon for server[%s].", server.getName());
+        return loadQueuePeon;
+      });
+    }
+
+    // Remove peons for disappeared servers
+    for (ImmutableDruidServer server : currentServers) {
+      oldServers.remove(server.getName());
+    }
+    for (String name : oldServers) {
+      log.debug("Removing LoadQueuePeon for disappeared server[%s].", name);
+      LoadQueuePeon peon = loadManagementPeons.remove(name);
+      peon.stop();
+    }
+  }
+
+  public synchronized void onLeaderStart()
+  {
+    isLeader.set(true);
+  }
+
+  /**
+   * Stops and removes all peons.
+   */
+  public synchronized void onLeaderStop()
+  {
+    isLeader.set(false);
+
+    loadManagementPeons.values().forEach(LoadQueuePeon::stop);
+    loadManagementPeons.clear();
+  }
+
   public boolean isHttpLoading()
   {
     return httpLoading;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 249dea2ce9..d1e5b7c5b2 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListeningExecutorService;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.curator.framework.CuratorFramework;
@@ -51,7 +50,6 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.ServerType;
 import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
-import 
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
 import org.apache.druid.server.coordinator.compact.NewestSegmentFirstPolicy;
 import org.apache.druid.server.coordinator.duty.CompactSegments;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
@@ -583,60 +581,6 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.verify(metadataRuleManager);
   }
 
-  @Test
-  public void testBalancerThreadNumber()
-  {
-    ScheduledExecutorFactory scheduledExecutorFactory = 
EasyMock.createNiceMock(ScheduledExecutorFactory.class);
-    EasyMock.replay(scheduledExecutorFactory);
-
-    DruidCoordinator c = new DruidCoordinator(
-        druidCoordinatorConfig,
-        EasyMock.createNiceMock(JacksonConfigManager.class),
-        null,
-        null,
-        null,
-        null,
-        scheduledExecutorFactory,
-        null,
-        loadQueueTaskMaster,
-        null,
-        null,
-        null,
-        null,
-        null,
-        new CoordinatorCustomDutyGroups(ImmutableSet.of()),
-        new RandomBalancerStrategyFactory(),
-        null,
-        null,
-        null
-    );
-
-    // before initialization
-    Assert.assertEquals(0, c.getCachedBalancerThreadNumber());
-    Assert.assertNull(c.getBalancerExec());
-
-    // first initialization
-    c.createBalancerStrategy(5);
-    Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
-    ListeningExecutorService firstExec = c.getBalancerExec();
-    Assert.assertNotNull(firstExec);
-
-    // second initialization, expect no changes as cachedBalancerThreadNumber 
is not changed
-    c.createBalancerStrategy(5);
-    Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
-    ListeningExecutorService secondExec = c.getBalancerExec();
-    Assert.assertNotNull(secondExec);
-    Assert.assertSame(firstExec, secondExec);
-
-    // third initialization, expect executor recreated as 
cachedBalancerThreadNumber is changed to 10
-    c.createBalancerStrategy(10);
-    Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
-    ListeningExecutorService thirdExec = c.getBalancerExec();
-    Assert.assertNotNull(thirdExec);
-    Assert.assertNotSame(secondExec, thirdExec);
-    Assert.assertNotSame(firstExec, thirdExec);
-  }
-
   @Test
   public void testCompactSegmentsDutyWhenCustomDutyGroupEmpty()
   {
@@ -767,6 +711,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
   {
     // Some nessesary setup to start the Coordinator
+    setupPeons(Collections.emptyMap());
     JacksonConfigManager configManager = 
EasyMock.createNiceMock(JacksonConfigManager.class);
     EasyMock.expect(
         configManager.watch(
@@ -805,10 +750,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
     
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
     EasyMock.expect(segmentsMetadataManager.iterateAllUsedSegments())
             .andReturn(Collections.singletonList(dataSegment)).anyTimes();
-    EasyMock.replay(segmentsMetadataManager);
     
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
     
EasyMock.expect(serverInventoryView.getInventory()).andReturn(Collections.emptyList()).anyTimes();
-    EasyMock.replay(serverInventoryView);
+    EasyMock.replay(serverInventoryView, loadQueueTaskMaster, 
segmentsMetadataManager);
 
     // Create CoordinatorCustomDutyGroups
     // We will have two groups and each group has one duty
@@ -942,7 +886,16 @@ public class DruidCoordinatorTest extends CuratorTestBase
 
   private void setupPeons(Map<String, LoadQueuePeon> peonMap)
   {
-    
EasyMock.expect(loadQueueTaskMaster.giveMePeon(EasyMock.anyObject())).andAnswer(
+    loadQueueTaskMaster.resetPeonsForNewServers(EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
+    loadQueueTaskMaster.onLeaderStart();
+    EasyMock.expectLastCall().anyTimes();
+    loadQueueTaskMaster.onLeaderStop();
+    EasyMock.expectLastCall().anyTimes();
+
+    
EasyMock.expect(loadQueueTaskMaster.getAllPeons()).andReturn(peonMap).anyTimes();
+
+    
EasyMock.expect(loadQueueTaskMaster.getPeonForServer(EasyMock.anyObject())).andAnswer(
         () -> peonMap.get(((ImmutableDruidServer) 
EasyMock.getCurrentArgument(0)).getName())
     ).anyTimes();
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
index d08b8ff104..f8234b36da 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/BalancerStrategyFactoryTest.java
@@ -22,42 +22,45 @@ package org.apache.druid.server.coordinator.balancer;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.segment.TestHelper;
-import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 public class BalancerStrategyFactoryTest
 {
   private final ObjectMapper MAPPER = TestHelper.makeJsonMapper();
 
-  private ListeningExecutorService executorService;
-
-  @Before
-  public void setup()
-  {
-    executorService = MoreExecutors.listeningDecorator(
-        new BlockingExecutorService("StrategyFactoryTest-%s")
-    );
-  }
-
-  @After
-  public void tearDown()
-  {
-    executorService.shutdownNow();
-  }
-
   @Test
   public void testCachingCostStrategyFallsBackToCost() throws 
JsonProcessingException
   {
     final String json = "{\"strategy\":\"cachingCost\"}";
     BalancerStrategyFactory factory = MAPPER.readValue(json, 
BalancerStrategyFactory.class);
-    BalancerStrategy strategy = 
factory.createBalancerStrategy(executorService);
+    BalancerStrategy strategy = factory.createBalancerStrategy(1);
 
     Assert.assertTrue(strategy instanceof CostBalancerStrategy);
     Assert.assertFalse(strategy instanceof CachingCostBalancerStrategy);
+
+    factory.stopExecutor();
+  }
+
+  @Test
+  public void testBalancerFactoryCreatesNewExecutorIfNumThreadsChanges()
+  {
+    BalancerStrategyFactory factory = new CostBalancerStrategyFactory();
+    ListeningExecutorService exec1 = factory.getOrCreateBalancerExecutor(1);
+    ListeningExecutorService exec2 = factory.getOrCreateBalancerExecutor(2);
+
+    Assert.assertTrue(exec1.isShutdown());
+    Assert.assertNotSame(exec1, exec2);
+
+    ListeningExecutorService exec3 = factory.getOrCreateBalancerExecutor(3);
+    Assert.assertTrue(exec2.isShutdown());
+    Assert.assertNotSame(exec2, exec3);
+
+    ListeningExecutorService exec4 = factory.getOrCreateBalancerExecutor(3);
+    Assert.assertFalse(exec3.isShutdown());
+    Assert.assertSame(exec3, exec4);
+
+    factory.stopExecutor();
   }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index aada7b3214..1f51565bcd 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -30,7 +30,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
-import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.balancer.CostBalancerStrategy;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
 import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
@@ -92,7 +92,7 @@ public class BalanceSegmentsTest
     server4 = new DruidServer("server4", "server4", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
 
     balancerStrategyExecutor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"BalanceSegmentsTest-%d"));
-    balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+    balancerStrategy = new CostBalancerStrategy(balancerStrategyExecutor);
 
     broadcastDatasources = Collections.singleton("datasourceBroadcast");
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
index 1090ef573e..9921281fde 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStatsTest.java
@@ -19,14 +19,14 @@
 
 package org.apache.druid.server.coordinator.duty;
 
-import it.unimi.dsi.fastutil.objects.Object2IntMaps;
-import it.unimi.dsi.fastutil.objects.Object2LongMaps;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.server.coordinator.DruidCluster;
-import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.balancer.RandomBalancerStrategy;
+import org.apache.druid.server.coordinator.loading.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
 import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.coordinator.stats.Stats;
 import org.junit.Assert;
@@ -36,13 +36,11 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
-import java.util.Collections;
-
 @RunWith(MockitoJUnitRunner.class)
 public class CollectSegmentAndServerStatsTest
 {
   @Mock
-  private DruidCoordinator mockDruidCoordinator;
+  private LoadQueueTaskMaster mockTaskMaster;
 
   @Test
   public void testCollectedSegmentStats()
@@ -55,17 +53,15 @@ public class CollectSegmentAndServerStatsTest
                                      .withSegmentAssignerUsing(new 
SegmentLoadQueueManager(null, null, null))
                                      .build();
 
-    Mockito.when(mockDruidCoordinator.getDatasourceToUnavailableSegmentCount())
-           .thenReturn(Object2IntMaps.singleton("ds", 10));
-    
Mockito.when(mockDruidCoordinator.getTierToDatasourceToUnderReplicatedCount(false))
-           .thenReturn(Collections.singletonMap("ds", 
Object2LongMaps.singleton("tier1", 100)));
+    Mockito.when(mockTaskMaster.getAllPeons())
+           .thenReturn(ImmutableMap.of("server1", new TestLoadQueuePeon()));
 
-    CoordinatorDuty duty = new 
CollectSegmentAndServerStats(mockDruidCoordinator);
+    CoordinatorDuty duty = new CollectSegmentAndServerStats(mockTaskMaster);
     DruidCoordinatorRuntimeParams params = duty.run(runtimeParams);
 
     CoordinatorRunStats stats = params.getCoordinatorStats();
-    Assert.assertTrue(stats.hasStat(Stats.Segments.UNAVAILABLE));
-    Assert.assertTrue(stats.hasStat(Stats.Segments.UNDER_REPLICATED));
+    Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_LOAD));
+    Assert.assertTrue(stats.hasStat(Stats.SegmentQueue.NUM_TO_DROP));
   }
 
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 6884d25975..2926dbd6d7 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -1774,7 +1774,7 @@ public class CompactSegmentsTest
   {
     DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
         .newBuilder(DateTimes.nowUtc())
-        .withSnapshotOfDataSourcesWithAllUsedSegments(dataSources)
+        .withDataSourcesSnapshot(dataSources)
         .withCompactionConfig(
             new CoordinatorCompactionConfig(
                 compactionConfigs,
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
index ce89cd616b..acbf89e322 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnusedTest.java
@@ -91,7 +91,7 @@ public class MarkOvershadowedSegmentsAsUnusedTest
 
     DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
         .newBuilder(DateTimes.nowUtc())
-        .withSnapshotOfDataSourcesWithAllUsedSegments(
+        .withDataSourcesSnapshot(
             
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments()
         )
         .withDruidCluster(druidCluster)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to