github-code-scanning[bot] commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1163048337


##########
server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java:
##########
@@ -117,397 +102,290 @@
   @Before
   public void setUp()
   {
-    EmittingLogger.registerEmitter(EMITTER);
-    EMITTER.start();
-    throttler = EasyMock.createMock(ReplicationThrottler.class);
-
     exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"LoadRuleTest-%d"));
     balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(exec);
     cachingCostBalancerStrategy = new 
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
 
     mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
+    stateManager = new SegmentStateManager(null, null, null);
   }
 
   @After
-  public void tearDown() throws Exception
+  public void tearDown()
   {
     exec.shutdown();
-    EMITTER.close();
   }
 
   @Test
   public void testLoad()
   {
-    
EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
-
     final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), 
EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
 
-    LoadRule rule = createLoadRule(ImmutableMap.of(
-        "hot", 1,
-        DruidServer.DEFAULT_TIER, 2
-    ));
-
-    final DataSegment segment = createDataSegment("foo");
-
-    throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER, 
segment.getId(), "hostNorm");
-    EasyMock.expectLastCall().once();
-
     if (!useRoundRobinAssignment) {
       
EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(),
 EasyMock.anyObject()))
               .andDelegateTo(balancerStrategy)
-              .times(3);
+              .times(2);
     }
-
-    EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
+    EasyMock.replay(mockPeon, mockBalancerStrategy);
 
     DruidCluster druidCluster = DruidClusterBuilder
         .newBuilder()
-        .addTier(
-            "hot",
-            new ServerHolder(
-                new DruidServer("serverHot", "hostHot", null, 1000, 
ServerType.HISTORICAL, "hot", 1)
-                    .toImmutableDruidServer(),
-                mockPeon
-            )
-        )
-        .addTier(
-            DruidServer.DEFAULT_TIER,
-            new ServerHolder(
-                new DruidServer(
-                    "serverNorm",
-                    "hostNorm",
-                    null,
-                    1000,
-                    ServerType.HISTORICAL,
-                    DruidServer.DEFAULT_TIER,
-                    0
-                ).toImmutableDruidServer(),
-                mockPeon
-            )
-        )
+        .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
+        .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false))
         .build();
 
-    CoordinatorStats stats = rule.run(null, 
makeCoordinatorRuntimeParams(druidCluster, segment), segment);
+    final DataSegment segment = createDataSegment(DS_WIKI);
+    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 2));
+    CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, 
druidCluster);
 
-    Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, 
"hot"));
-    Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, 
DruidServer.DEFAULT_TIER));
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T1, DS_WIKI));
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, 
Tier.T2, DS_WIKI));
 
-    EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
+    EasyMock.verify(mockPeon, mockBalancerStrategy);
+  }
+
+  private CoordinatorRunStats runRuleAndGetStats(
+      LoadRule rule,
+      DataSegment segment,
+      DruidCluster cluster
+  )
+  {
+    return runRuleAndGetStats(rule, segment, 
makeCoordinatorRuntimeParams(cluster, segment));
+  }
+
+  private CoordinatorRunStats runRuleAndGetStats(
+      LoadRule rule,
+      DataSegment segment,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    ReplicationThrottler throttler = new ReplicationThrottler(
+        Sets.newHashSet(params.getDruidCluster().getTierNames()),
+        dynamicConfig.getReplicationThrottleLimit(),
+        dynamicConfig.getReplicantLifetime(),
+        dynamicConfig.getMaxNonPrimaryReplicantsToLoad()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4775)



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java:
##########
@@ -163,9 +144,74 @@
          .emit();
     }
 
+    segmentLoader.makeAlerts();
+
+    final CoordinatorRunStats stats = segmentLoader.getStats();
+    stats.forEachRowKey(
+        (dimensionValues, statValues) ->
+            log.info("Stats for dimensions [%s]: [%s]", dimensionValues, 
statValues)
+    );
+
     return params.buildFromExisting()
                  .withCoordinatorStats(stats)
                  .withBroadcastDatasources(broadcastDatasources)
+                 .withReplicationManager(replicationThrottler)
                  .build();
   }
+
+  /**
+   * Reduces the lifetimes of segments currently being replicated in all the 
tiers.
+   * Returns the set of tiers that are currently replicatinng some segments and
+   * won't be eligible for assigning more replicas in this run.
+   */
+  private Set<String> reduceLifetimesAndGetBusyTiers(CoordinatorDynamicConfig 
dynamicConfig)
+  {
+    final Set<String> busyTiers = new HashSet<>();
+    stateManager.reduceLifetimesOfReplicatingSegments().forEach((tier, 
replicatingState) -> {
+      int numReplicatingSegments = replicatingState.getNumProcessingSegments();
+      if (numReplicatingSegments <= 0) {
+        return;
+      }
+
+      busyTiers.add(tier);
+      log.info(
+          "Skipping replication on tier [%s] as is it still has"
+          + " [%d] segments in queue with lifetime [%d / %d].",
+          tier,
+          numReplicatingSegments,
+          replicatingState.getMinLifetime(),
+          dynamicConfig.getReplicantLifetime()
+      );
+
+      // Create alerts for stuck tiers
+      if (replicatingState.getMinLifetime() <= 0) {
+        log.makeAlert(
+            "Replication queue for tier [%s] has [%d] segments stuck.",
+            tier,
+            replicatingState.getNumExpiredSegments()
+        ).addData("segments", replicatingState.getExpiredSegments()).emit();
+      }
+    });
+
+    return busyTiers;
+  }
+
+  private ReplicationThrottler createReplicationThrottler(
+      Set<String> busyTiers,
+      DruidCluster cluster,
+      CoordinatorDynamicConfig dynamicConfig
+  )
+  {
+    // Tiers that already have some replication in progress are not eligible 
for
+    // replication in this coordinator run
+    final Set<String> tiersEligibleForReplication = 
Sets.newHashSet(cluster.getTierNames());
+    tiersEligibleForReplication.removeAll(busyTiers);
+
+    return new ReplicationThrottler(
+        tiersEligibleForReplication,
+        dynamicConfig.getReplicationThrottleLimit(),
+        dynamicConfig.getReplicantLifetime(),
+        dynamicConfig.getMaxNonPrimaryReplicantsToLoad()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4771)



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
       return this;
     }
 
+    /**
+     * Builds a CoordinatoryDynamicConfig using either the configured values, 
or
+     * the default value if not configured.
+     */
     public CoordinatorDynamicConfig build()
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null ? 
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
-                                                     : 
percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
-          replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
-          balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : 
balancerComputeThreads,
-          emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : 
emitBalancingStats,
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+          ),
+          valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+          valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+          valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+          valueOrDefault(useBatchedSegmentSampler, 
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+          valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+          valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
+          valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
+          valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
           specificDataSourcesToKillUnusedSegmentsIn,
           dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
-          : maxSegmentsInNodeLoadingQueue,
+          valueOrDefault(maxSegmentsInNodeLoadingQueue, 
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
           decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
-                                                : 
maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              decommissioningMaxPercentOfMaxSegmentsToMove,
+              Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+          ),
+          valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+          valueOrDefault(replicateAfterLoadTimeout, 
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+          valueOrDefault(maxNonPrimaryReplicantsToLoad, 
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+          valueOrDefault(useRoundRobinSegmentAssignment, 
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
       );
     }
 
+    private static <T> T valueOrDefault(@Nullable T value, @NotNull T 
defaultValue)
+    {
+      return value == null ? defaultValue : value;
+    }
+
     public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? defaults.getMergeBytesLimit() : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null
-          ? defaults.getPercentOfSegmentsToConsiderPerMove()
-          : percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
-          replicantLifetime == null ? defaults.getReplicantLifetime() : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
-          balancerComputeThreads == null ? 
defaults.getBalancerComputeThreads() : balancerComputeThreads,
-          emitBalancingStats == null ? defaults.emitBalancingStats() : 
emitBalancingStats,
-          specificDataSourcesToKillUnusedSegmentsIn == null
-          ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
-          : specificDataSourcesToKillUnusedSegmentsIn,
-          dataSourcesToNotKillStalePendingSegmentsIn == null
-          ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
-          : dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? defaults.getMaxSegmentsInNodeLoadingQueue()
-          : maxSegmentsInNodeLoadingQueue,
-          decommissioningNodes == null ? defaults.getDecommissioningNodes() : 
decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? defaults.getPauseCoordination() : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null
-          ? defaults.getMaxNonPrimaryReplicantsToLoad()
-          : maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+          ),
+          valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+          valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+          valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
defaults.getPercentOfSegmentsToConsiderPerMove()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getPercentOfSegmentsToConsiderPerMove](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4766)



##########
server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java:
##########
@@ -497,4 +463,27 @@
                       .size(0)
                       .build();
   }
+
+  private SegmentLoader createSegmentLoader(
+      SegmentStateManager stateManager,
+      DruidCoordinatorRuntimeParams params,
+      String... tiersEligibleForReplication
+  )
+  {
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    ReplicationThrottler throttler = new ReplicationThrottler(
+        new HashSet<>(Arrays.asList(tiersEligibleForReplication)),
+        dynamicConfig.getReplicationThrottleLimit(),
+        dynamicConfig.getReplicantLifetime(),
+        dynamicConfig.getMaxNonPrimaryReplicantsToLoad()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4773)



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -20,292 +20,326 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordinator.BalancerSegmentHolder;
 import org.apache.druid.server.coordinator.BalancerStrategy;
-import org.apache.druid.server.coordinator.CoordinatorStats;
-import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCluster;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.LoadPeonCallback;
-import org.apache.druid.server.coordinator.LoadQueuePeon;
+import org.apache.druid.server.coordinator.SegmentLoader;
+import org.apache.druid.server.coordinator.SegmentStateManager;
 import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 /**
+ *
  */
 public class BalanceSegments implements CoordinatorDuty
 {
   protected static final EmittingLogger log = new 
EmittingLogger(BalanceSegments.class);
+  private final SegmentStateManager stateManager;
 
-  protected final DruidCoordinator coordinator;
-
-  protected final Map<String, ConcurrentHashMap<SegmentId, 
BalancerSegmentHolder>> currentlyMovingSegments =
-      new HashMap<>();
-
-  public BalanceSegments(DruidCoordinator coordinator)
+  public BalanceSegments(SegmentStateManager stateManager)
   {
-    this.coordinator = coordinator;
+    this.stateManager = stateManager;
   }
 
-  protected void reduceLifetimes(String tier)
+  /**
+   * Reduces the lifetimes of segments currently being moved in all the tiers.
+   * Raises alerts for segments stuck in the queue.
+   */
+  private void reduceLifetimesAndAlert()
   {
-    for (BalancerSegmentHolder holder : 
currentlyMovingSegments.get(tier).values()) {
-      holder.reduceLifetime();
-      if (holder.getLifetime() <= 0) {
-        log.makeAlert("[%s]: Balancer move segments queue has a segment 
stuck", tier)
-           .addData("segment", holder.getSegment().getId())
-           .addData("server", holder.getFromServer().getMetadata())
-           .emit();
+    stateManager.reduceLifetimesOfMovingSegments().forEach((tier, movingState) 
-> {
+      int numMovingSegments = movingState.getNumProcessingSegments();
+      if (numMovingSegments <= 0) {
+        return;
       }
-    }
+
+      // Create alerts for stuck tiers
+      if (movingState.getMinLifetime() <= 0) {
+        log.makeAlert(
+            "Balancing queue for tier [%s] has [%d] segments stuck.",
+            tier,
+            movingState.getNumExpiredSegments()
+        ).addData("segments", movingState.getExpiredSegments()).emit();
+      }
+    });
   }
 
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
   {
-    final CoordinatorStats stats = new CoordinatorStats();
-    params.getDruidCluster().getHistoricals().forEach((String tier, 
NavigableSet<ServerHolder> servers) -> {
-      balanceTier(params, tier, servers, stats);
-    });
+    reduceLifetimesAndAlert();
+
+    if (params.getUsedSegments().isEmpty()) {
+      log.info("Skipping balance as there are no used segments.");
+      return params;
+    }
+
+    final DruidCluster cluster = params.getDruidCluster();
+    final int maxSegmentsToMove = 
params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
+    if (maxSegmentsToMove <= 0) {
+      log.info("Skipping balance as maxSegmentsToMove is [%d].", 
maxSegmentsToMove);
+      return params;
+    } else {
+      int maxLifetime = 
params.getCoordinatorDynamicConfig().getReplicantLifetime();
+      log.info(
+          "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d], 
maxLifetime=[%d].",
+          cluster.getTierNames(), maxSegmentsToMove, maxLifetime
+      );
+    }
+
+    final CoordinatorRunStats stats = new CoordinatorRunStats();
+    final SegmentLoader loader = new SegmentLoader(
+        stateManager,
+        params.getDruidCluster(),
+        params.getSegmentReplicantLookup(),
+        params.getReplicationManager(),
+        params.getBalancerStrategy(),
+        params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()
+    );
+
+    cluster.getHistoricals().forEach(
+        (tier, servers) -> balanceTier(params, tier, servers, stats, loader)
+    );
+
+    loader.makeAlerts();
+    stats.accumulate(loader.getStats());
+
+    stats.forEachRowKey(
+        (dimensionValues, statValues) ->
+            log.info("Stats for dimensions [%s] are [%s]", dimensionValues, 
statValues)
+    );
+
     return params.buildFromExisting().withCoordinatorStats(stats).build();
   }
 
+  /**
+   * Moves as many segments as possible from decommissioning to active servers,
+   * then moves segments amongst active servers.
+   */
   private void balanceTier(
       DruidCoordinatorRuntimeParams params,
       String tier,
       SortedSet<ServerHolder> servers,
-      CoordinatorStats stats
+      CoordinatorRunStats stats,
+      SegmentLoader loader
   )
   {
-
-    log.info("Balancing segments in tier [%s]", tier);
-    if (params.getUsedSegments().size() == 0) {
-      log.info("Metadata segments are not available. Cannot balance.");
-      // suppress emit zero stats
-      return;
-    }
-    currentlyMovingSegments.computeIfAbsent(tier, t -> new 
ConcurrentHashMap<>());
-
-    if (!currentlyMovingSegments.get(tier).isEmpty()) {
-      reduceLifetimes(tier);
-      log.info(
-          "[%s]: Still waiting on %,d segments to be moved. Skipping balance.",
-          tier,
-          currentlyMovingSegments.get(tier).size()
-      );
-      // suppress emit zero stats
-      return;
-    }
-
-    /*
-      Take as many segments from decommissioning servers as 
decommissioningMaxPercentOfMaxSegmentsToMove allows and find
-      the best location for them on active servers. After that, balance 
segments within active servers pool.
-     */
     Map<Boolean, List<ServerHolder>> partitions =
         
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
     final List<ServerHolder> decommissioningServers = partitions.get(true);
     final List<ServerHolder> activeServers = partitions.get(false);
+
     log.info(
-        "Found %d active servers, %d decommissioning servers",
-        activeServers.size(),
-        decommissioningServers.size()
+        "Balancing segments in tier [%s] with [%d] activeServers and [%d] 
decommissioning servers.",
+        tier, activeServers.size(), decommissioningServers.size()
     );
 
     if ((decommissioningServers.isEmpty() && activeServers.size() <= 1) || 
activeServers.isEmpty()) {
-      log.warn("[%s]: insufficient active servers. Cannot balance.", tier);
-      // suppress emit zero stats
+      log.warn("Skipping balance for tier [%s] as there are [%d] active 
servers.", tier, activeServers.size());
       return;
     }
 
-    int numSegments = 0;
-    for (ServerHolder sourceHolder : servers) {
-      numSegments += sourceHolder.getServer().getNumSegments();
-    }
-
-    if (numSegments == 0) {
-      log.info("No segments found. Cannot balance.");
-      // suppress emit zero stats
+    final int maxSegmentsToMove = 
params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
+    if (maxSegmentsToMove <= 0) {
       return;
     }
 
-    final int maxSegmentsToMove = 
Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), 
numSegments);
+    // Move segments from decommissioning to active servers
+    int maxDecommPercentToMove = params.getCoordinatorDynamicConfig()
+                                       
.getDecommissioningMaxPercentOfMaxSegmentsToMove();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking 
[CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4770)



##########
server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java:
##########
@@ -626,189 +422,115 @@
         .withBroadcastDatasources(broadcastDatasources)
         .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertEquals(2L, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertEquals(2L, stats.getTieredStat(Stats.Segments.MOVED, 
"normal"));
   }
 
-  private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
-      List<ImmutableDruidServer> druidServers,
-      List<LoadQueuePeon> peons
-  )
+  private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params)
   {
-    return defaultRuntimeParamsBuilder(
-        druidServers,
-        peons,
-        druidServers.stream().map(s -> false).collect(Collectors.toList())
-    );
+    params = new BalanceSegments(stateManager).run(params);
+    if (params == null) {
+      Assert.fail("BalanceSegments duty returned null params");
+      return new CoordinatorRunStats();
+    } else {
+      return params.getCoordinatorStats();
+    }
   }
 
   private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
-      List<ImmutableDruidServer> druidServers,
-      List<LoadQueuePeon> peons,
-      List<Boolean> decommissioning
+      ServerHolder... servers
   )
   {
     return CoordinatorRuntimeParamsTestHelpers
         .newBuilder()
-        .withDruidCluster(
-            DruidClusterBuilder
-                .newBuilder()
-                .addTier(
-                    "normal",
-                    IntStream
-                        .range(0, druidServers.size())
-                        .mapToObj(i -> new ServerHolder(druidServers.get(i), 
peons.get(i), decommissioning.get(i)))
-                        .toArray(ServerHolder[]::new)
-                )
-                .build()
-        )
-        .withLoadManagementPeons(
-            IntStream
-                .range(0, peons.size())
-                .boxed()
-                .collect(Collectors.toMap(i -> String.valueOf(i + 1), 
peons::get))
-        )
-        .withUsedSegmentsInTest(segments)
-        
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
+        .withDruidCluster(DruidClusterBuilder.newBuilder().addTier("normal", 
servers).build())
+        .withUsedSegmentsInTest(allSegments)
         .withBroadcastDatasources(broadcastDatasources)
-        .withBalancerStrategy(balancerStrategy);
+        .withBalancerStrategy(balancerStrategy)
+        .withReplicationManager(createReplicationThrottler());
   }
 
-  private static void mockDruidServer(
-      ImmutableDruidServer druidServer,
-      String name,
-      String tier,
-      long currentSize,
-      long maxSize,
-      List<DataSegment> segments
-  )
+  private ServerHolder createHolder(DruidServer server, DataSegment... 
loadedSegments)
   {
-    EasyMock.expect(druidServer.getName()).andReturn(name).anyTimes();
-    EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
-    
EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
-    EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
-    ImmutableDruidServerTests.expectSegments(druidServer, segments);
-    EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
-    
EasyMock.expect(druidServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes();
-    if (!segments.isEmpty()) {
-      segments.forEach(
-          s -> 
EasyMock.expect(druidServer.getSegment(s.getId())).andReturn(s).anyTimes()
-      );
-    }
-    
EasyMock.expect(druidServer.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
-    EasyMock.replay(druidServer);
+    return createHolder(server, false, loadedSegments);
   }
 
-  private static void mockCoordinator(DruidCoordinator coordinator)
+  private ServerHolder createHolder(DruidServer server, boolean 
isDecommissioning, DataSegment... loadedSegments)
   {
-    coordinator.moveSegment(
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject(),
-        EasyMock.anyObject()
-    );
-    EasyMock.expectLastCall().anyTimes();
-    EasyMock.replay(coordinator);
+    return createHolder(server, 0, isDecommissioning, loadedSegments);
   }
 
-  private static class PredefinedPickOrderBalancerStrategy implements 
BalancerStrategy
+  private ServerHolder createHolder(
+      DruidServer server,
+      int maxSegmentsInLoadQueue,
+      boolean isDecommissioning,
+      DataSegment... loadedSegments
+  )
   {
-    private final BalancerStrategy delegate;
-    private final List<BalancerSegmentHolder> pickOrder;
-    private final AtomicInteger pickCounter = new AtomicInteger(0);
-
-    PredefinedPickOrderBalancerStrategy(
-        BalancerStrategy delegate,
-        List<BalancerSegmentHolder> pickOrder
-    )
-    {
-      this.delegate = delegate;
-      this.pickOrder = pickOrder;
-    }
-
-    @Override
-    public ServerHolder findNewSegmentHomeBalancer(DataSegment 
proposalSegment, List<ServerHolder> serverHolders)
-    {
-      return delegate.findNewSegmentHomeBalancer(proposalSegment, 
serverHolders);
-    }
-
-    @Override
-    public ServerHolder findNewSegmentHomeReplicator(DataSegment 
proposalSegment, List<ServerHolder> serverHolders)
-    {
-      return delegate.findNewSegmentHomeReplicator(proposalSegment, 
serverHolders);
-    }
-
-    @Override
-    public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
-        List<ServerHolder> serverHolders,
-        Set<String> broadcastDatasources,
-        int numberOfSegments
-    )
-    {
-      return pickOrder.iterator();
-    }
-
-    @Override
-    public Iterator<BalancerSegmentHolder> pickSegmentsToMove(
-        List<ServerHolder> serverHolders,
-        Set<String> broadcastDatasources,
-        double percentOfSegmentsToConsider
-    )
-    {
-      return pickOrder.iterator();
+    for (DataSegment segment : loadedSegments) {
+      server.addDataSegment(segment);
     }
 
-    @Override
-    public void emitStats(String tier, CoordinatorStats stats, 
List<ServerHolder> serverHolderList)
-    {
-      delegate.emitStats(tier, stats, serverHolderList);
-    }
+    return new ServerHolder(
+        server.toImmutableDruidServer(),
+        new LoadQueuePeonTester(),
+        isDecommissioning,
+        maxSegmentsInLoadQueue
+    );
   }
 
-  private DruidCoordinatorRuntimeParams 
setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(int percent)
+  private IExpectationSetters<Iterator<BalancerSegmentHolder>> 
expectPickLoadingSegmentsAndReturnEmpty(
+      BalancerStrategy strategy
+  )
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, 
Arrays.asList(segment1, segment3));
-    mockDruidServer(druidServer2, "2", "normal", 30L, 100L, 
Arrays.asList(segment2, segment3));
-    mockDruidServer(druidServer3, "3", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    return EasyMock.expect(
+        strategy.pickSegmentsToMove(
+            EasyMock.anyObject(),
+            EasyMock.anyObject(),
+            EasyMock.anyInt(),
+            EasyMock.eq(true)
+        )
+    ).andReturn(Collections.emptyIterator());
+  }
 
-    // either decommissioning servers list or acitve ones (ie servers list is 
[2] or [1, 3])
-    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(
+  private IExpectationSetters<Iterator<BalancerSegmentHolder>> 
expectPickLoadedSegmentsAndReturn(
+      BalancerStrategy strategy,
+      BalancerSegmentHolder... pickedLoadedSegments
+  )
+  {
+    return EasyMock.expect(
         strategy.pickSegmentsToMove(
-            ImmutableList.of(
-                new ServerHolder(druidServer2, peon2, true)
-            ),
-            broadcastDatasources,
-            100.0
+            EasyMock.anyObject(),
+            EasyMock.anyObject(),
+            EasyMock.anyInt(),
+            EasyMock.eq(false)
         )
-    ).andReturn(
-        ImmutableList.of(new BalancerSegmentHolder(druidServer2, 
segment2)).iterator()
+    ).andReturn(Arrays.asList(pickedLoadedSegments).iterator());
+  }
+
+  private ReplicationThrottler createReplicationThrottler()
+  {
+    CoordinatorDynamicConfig dynamicConfig = 
CoordinatorDynamicConfig.builder().build();
+    return new ReplicationThrottler(
+        Collections.singleton("normal"),
+        dynamicConfig.getReplicationThrottleLimit(),
+        dynamicConfig.getReplicantLifetime(),
+        dynamicConfig.getMaxNonPrimaryReplicantsToLoad()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4772)



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
       return this;
     }
 
+    /**
+     * Builds a CoordinatoryDynamicConfig using either the configured values, 
or
+     * the default value if not configured.
+     */
     public CoordinatorDynamicConfig build()
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null ? 
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
-                                                     : 
percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
-          replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
-          balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : 
balancerComputeThreads,
-          emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : 
emitBalancingStats,
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+          ),
+          valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+          valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+          valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+          valueOrDefault(useBatchedSegmentSampler, 
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+          valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+          valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
+          valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
+          valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
           specificDataSourcesToKillUnusedSegmentsIn,
           dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
-          : maxSegmentsInNodeLoadingQueue,
+          valueOrDefault(maxSegmentsInNodeLoadingQueue, 
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
           decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
-                                                : 
maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              decommissioningMaxPercentOfMaxSegmentsToMove,
+              Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+          ),
+          valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+          valueOrDefault(replicateAfterLoadTimeout, 
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+          valueOrDefault(maxNonPrimaryReplicantsToLoad, 
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+          valueOrDefault(useRoundRobinSegmentAssignment, 
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
       );
     }
 
+    private static <T> T valueOrDefault(@Nullable T value, @NotNull T 
defaultValue)
+    {
+      return value == null ? defaultValue : value;
+    }
+
     public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? defaults.getMergeBytesLimit() : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null
-          ? defaults.getPercentOfSegmentsToConsiderPerMove()
-          : percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
-          replicantLifetime == null ? defaults.getReplicantLifetime() : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
-          balancerComputeThreads == null ? 
defaults.getBalancerComputeThreads() : balancerComputeThreads,
-          emitBalancingStats == null ? defaults.emitBalancingStats() : 
emitBalancingStats,
-          specificDataSourcesToKillUnusedSegmentsIn == null
-          ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
-          : specificDataSourcesToKillUnusedSegmentsIn,
-          dataSourcesToNotKillStalePendingSegmentsIn == null
-          ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
-          : dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? defaults.getMaxSegmentsInNodeLoadingQueue()
-          : maxSegmentsInNodeLoadingQueue,
-          decommissioningNodes == null ? defaults.getDecommissioningNodes() : 
decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? defaults.getPauseCoordination() : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null
-          ? defaults.getMaxNonPrimaryReplicantsToLoad()
-          : maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+          ),
+          valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+          valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+          valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
defaults.getPercentOfSegmentsToConsiderPerMove()),
+          valueOrDefault(useBatchedSegmentSampler, 
defaults.useBatchedSegmentSampler()),
+          valueOrDefault(replicantLifetime, defaults.getReplicantLifetime()),
+          valueOrDefault(replicationThrottleLimit, 
defaults.getReplicationThrottleLimit()),
+          valueOrDefault(balancerComputeThreads, 
defaults.getBalancerComputeThreads()),
+          valueOrDefault(emitBalancingStats, defaults.emitBalancingStats()),
+          valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn, 
defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
+          valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn, 
defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
+          valueOrDefault(maxSegmentsInNodeLoadingQueue, 
defaults.getMaxSegmentsInNodeLoadingQueue()),
+          valueOrDefault(decommissioningNodes, 
defaults.getDecommissioningNodes()),
+          valueOrDefault(
+              decommissioningMaxPercentOfMaxSegmentsToMove,
+              defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
+          ),
+          valueOrDefault(pauseCoordination, defaults.getPauseCoordination()),
+          valueOrDefault(replicateAfterLoadTimeout, 
defaults.getReplicateAfterLoadTimeout()),
+          valueOrDefault(maxNonPrimaryReplicantsToLoad, 
defaults.getMaxNonPrimaryReplicantsToLoad()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4769)



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
       return this;
     }
 
+    /**
+     * Builds a CoordinatoryDynamicConfig using either the configured values, 
or
+     * the default value if not configured.
+     */
     public CoordinatorDynamicConfig build()
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null ? 
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
-                                                     : 
percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
-          replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
-          balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : 
balancerComputeThreads,
-          emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : 
emitBalancingStats,
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+          ),
+          valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+          valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+          valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+          valueOrDefault(useBatchedSegmentSampler, 
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+          valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+          valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
+          valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
+          valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
           specificDataSourcesToKillUnusedSegmentsIn,
           dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
-          : maxSegmentsInNodeLoadingQueue,
+          valueOrDefault(maxSegmentsInNodeLoadingQueue, 
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
           decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
-                                                : 
maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              decommissioningMaxPercentOfMaxSegmentsToMove,
+              Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+          ),
+          valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+          valueOrDefault(replicateAfterLoadTimeout, 
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+          valueOrDefault(maxNonPrimaryReplicantsToLoad, 
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+          valueOrDefault(useRoundRobinSegmentAssignment, 
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
       );
     }
 
+    private static <T> T valueOrDefault(@Nullable T value, @NotNull T 
defaultValue)
+    {
+      return value == null ? defaultValue : value;
+    }
+
     public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? defaults.getMergeBytesLimit() : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null
-          ? defaults.getPercentOfSegmentsToConsiderPerMove()
-          : percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
-          replicantLifetime == null ? defaults.getReplicantLifetime() : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
-          balancerComputeThreads == null ? 
defaults.getBalancerComputeThreads() : balancerComputeThreads,
-          emitBalancingStats == null ? defaults.emitBalancingStats() : 
emitBalancingStats,
-          specificDataSourcesToKillUnusedSegmentsIn == null
-          ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
-          : specificDataSourcesToKillUnusedSegmentsIn,
-          dataSourcesToNotKillStalePendingSegmentsIn == null
-          ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
-          : dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? defaults.getMaxSegmentsInNodeLoadingQueue()
-          : maxSegmentsInNodeLoadingQueue,
-          decommissioningNodes == null ? defaults.getDecommissioningNodes() : 
decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? defaults.getPauseCoordination() : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null
-          ? defaults.getMaxNonPrimaryReplicantsToLoad()
-          : maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+          ),
+          valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+          valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+          valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
defaults.getPercentOfSegmentsToConsiderPerMove()),
+          valueOrDefault(useBatchedSegmentSampler, 
defaults.useBatchedSegmentSampler()),
+          valueOrDefault(replicantLifetime, defaults.getReplicantLifetime()),
+          valueOrDefault(replicationThrottleLimit, 
defaults.getReplicationThrottleLimit()),
+          valueOrDefault(balancerComputeThreads, 
defaults.getBalancerComputeThreads()),
+          valueOrDefault(emitBalancingStats, defaults.emitBalancingStats()),
+          valueOrDefault(specificDataSourcesToKillUnusedSegmentsIn, 
defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()),
+          valueOrDefault(dataSourcesToNotKillStalePendingSegmentsIn, 
defaults.getDataSourcesToNotKillStalePendingSegmentsIn()),
+          valueOrDefault(maxSegmentsInNodeLoadingQueue, 
defaults.getMaxSegmentsInNodeLoadingQueue()),
+          valueOrDefault(decommissioningNodes, 
defaults.getDecommissioningNodes()),
+          valueOrDefault(
+              decommissioningMaxPercentOfMaxSegmentsToMove,
+              defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking 
[CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4768)



##########
server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java:
##########
@@ -713,75 +691,74 @@
       return this;
     }
 
+    /**
+     * Builds a CoordinatoryDynamicConfig using either the configured values, 
or
+     * the default value if not configured.
+     */
     public CoordinatorDynamicConfig build()
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
DEFAULT_LEADING_TIME_MILLIS_BEFORE_CAN_MARK_AS_UNUSED_OVERSHADOWED_SEGMENTS
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? DEFAULT_MERGE_BYTES_LIMIT : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? DEFAULT_MERGE_SEGMENTS_LIMIT : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? DEFAULT_MAX_SEGMENTS_TO_MOVE : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null ? 
DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
-                                                     : 
percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
DEFAULT_USE_BATCHED_SEGMENT_SAMPLER : useBatchedSegmentSampler,
-          replicantLifetime == null ? DEFAULT_REPLICANT_LIFETIME : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
DEFAULT_REPLICATION_THROTTLE_LIMIT : replicationThrottleLimit,
-          balancerComputeThreads == null ? DEFAULT_BALANCER_COMPUTE_THREADS : 
balancerComputeThreads,
-          emitBalancingStats == null ? DEFAULT_EMIT_BALANCING_STATS : 
emitBalancingStats,
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              Defaults.LEADING_MILLIS_BEFORE_MARK_UNUSED
+          ),
+          valueOrDefault(mergeBytesLimit, Defaults.MERGE_BYTES_LIMIT),
+          valueOrDefault(mergeSegmentsLimit, Defaults.MERGE_SEGMENTS_LIMIT),
+          valueOrDefault(maxSegmentsToMove, Defaults.MAX_SEGMENTS_TO_MOVE),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
Defaults.PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE),
+          valueOrDefault(useBatchedSegmentSampler, 
Defaults.USE_BATCHED_SEGMENT_SAMPLER),
+          valueOrDefault(replicantLifetime, Defaults.REPLICANT_LIFETIME),
+          valueOrDefault(replicationThrottleLimit, 
Defaults.REPLICATION_THROTTLE_LIMIT),
+          valueOrDefault(balancerComputeThreads, 
Defaults.BALANCER_COMPUTE_THREADS),
+          valueOrDefault(emitBalancingStats, Defaults.EMIT_BALANCING_STATS),
           specificDataSourcesToKillUnusedSegmentsIn,
           dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
-          : maxSegmentsInNodeLoadingQueue,
+          valueOrDefault(maxSegmentsInNodeLoadingQueue, 
Defaults.MAX_SEGMENTS_IN_NODE_LOADING_QUEUE),
           decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
-                                                : 
maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
DEFAULT_USE_ROUND_ROBIN_ASSIGNMENT : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              decommissioningMaxPercentOfMaxSegmentsToMove,
+              Defaults.DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT
+          ),
+          valueOrDefault(pauseCoordination, Defaults.PAUSE_COORDINATION),
+          valueOrDefault(replicateAfterLoadTimeout, 
Defaults.REPLICATE_AFTER_LOAD_TIMEOUT),
+          valueOrDefault(maxNonPrimaryReplicantsToLoad, 
Defaults.MAX_NON_PRIMARY_REPLICANTS_TO_LOAD),
+          valueOrDefault(useRoundRobinSegmentAssignment, 
Defaults.USE_ROUND_ROBIN_ASSIGNMENT)
       );
     }
 
+    private static <T> T valueOrDefault(@Nullable T value, @NotNull T 
defaultValue)
+    {
+      return value == null ? defaultValue : value;
+    }
+
     public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
     {
       return new CoordinatorDynamicConfig(
-          leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments == null
-          ? 
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
-          : leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
-          mergeBytesLimit == null ? defaults.getMergeBytesLimit() : 
mergeBytesLimit,
-          mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : 
mergeSegmentsLimit,
-          maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null
-          ? defaults.getPercentOfSegmentsToConsiderPerMove()
-          : percentOfSegmentsToConsiderPerMove,
-          useBatchedSegmentSampler == null ? 
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
-          replicantLifetime == null ? defaults.getReplicantLifetime() : 
replicantLifetime,
-          replicationThrottleLimit == null ? 
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
-          balancerComputeThreads == null ? 
defaults.getBalancerComputeThreads() : balancerComputeThreads,
-          emitBalancingStats == null ? defaults.emitBalancingStats() : 
emitBalancingStats,
-          specificDataSourcesToKillUnusedSegmentsIn == null
-          ? defaults.getSpecificDataSourcesToKillUnusedSegmentsIn()
-          : specificDataSourcesToKillUnusedSegmentsIn,
-          dataSourcesToNotKillStalePendingSegmentsIn == null
-          ? defaults.getDataSourcesToNotKillStalePendingSegmentsIn()
-          : dataSourcesToNotKillStalePendingSegmentsIn,
-          maxSegmentsInNodeLoadingQueue == null
-          ? defaults.getMaxSegmentsInNodeLoadingQueue()
-          : maxSegmentsInNodeLoadingQueue,
-          decommissioningNodes == null ? defaults.getDecommissioningNodes() : 
decommissioningNodes,
-          decommissioningMaxPercentOfMaxSegmentsToMove == null
-          ? defaults.getDecommissioningMaxPercentOfMaxSegmentsToMove()
-          : decommissioningMaxPercentOfMaxSegmentsToMove,
-          pauseCoordination == null ? defaults.getPauseCoordination() : 
pauseCoordination,
-          replicateAfterLoadTimeout == null ? 
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null
-          ? defaults.getMaxNonPrimaryReplicantsToLoad()
-          : maxNonPrimaryReplicantsToLoad,
-          useRoundRobinSegmentAssignment == null ? 
defaults.isUseRoundRobinSegmentAssignment() : useRoundRobinSegmentAssignment
+          valueOrDefault(
+              leadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments,
+              
defaults.getLeadingTimeMillisBeforeCanMarkAsUnusedOvershadowedSegments()
+          ),
+          valueOrDefault(mergeBytesLimit, defaults.getMergeBytesLimit()),
+          valueOrDefault(mergeSegmentsLimit, defaults.getMergeSegmentsLimit()),
+          valueOrDefault(maxSegmentsToMove, defaults.getMaxSegmentsToMove()),
+          valueOrDefault(percentOfSegmentsToConsiderPerMove, 
defaults.getPercentOfSegmentsToConsiderPerMove()),
+          valueOrDefault(useBatchedSegmentSampler, 
defaults.useBatchedSegmentSampler()),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.useBatchedSegmentSampler](1) should be 
avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4767)



##########
server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java:
##########
@@ -402,18 +409,40 @@
             largeSegments.get(2),
             largeSegments2.get(0),
             largeSegments2.get(1)
-        ),
-        smallSegment
+        )
     );
 
-    Assert.assertEquals(5L, stats.getGlobalStat(LoadRule.ASSIGNED_COUNT));
-    Assert.assertFalse(stats.hasPerTierStats());
-
+    Assert.assertEquals(5L, 
stats.getDataSourceStat(Stats.Segments.ASSIGNED_BROADCAST, DS_SMALL));
     Assert.assertTrue(
         druidCluster
             .getAllServers()
             .stream()
             .allMatch(holder -> holder.isLoadingSegment(smallSegment) || 
holder.isServingSegment(smallSegment))
     );
   }
+
+  private CoordinatorRunStats runRuleAndGetStats(
+      Rule rule,
+      DataSegment segment,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+    ReplicationThrottler throttler = new ReplicationThrottler(
+        Sets.newHashSet(params.getDruidCluster().getTierNames()),
+        dynamicConfig.getReplicationThrottleLimit(),
+        dynamicConfig.getReplicantLifetime(),
+        dynamicConfig.getMaxNonPrimaryReplicantsToLoad()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4774)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to