github-code-scanning[bot] commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1162306189


##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -20,292 +20,365 @@
 package org.apache.druid.server.coordinator.duty;
 
 import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordinator.BalancerSegmentHolder;
 import org.apache.druid.server.coordinator.BalancerStrategy;
-import org.apache.druid.server.coordinator.CoordinatorStats;
-import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.LoadPeonCallback;
-import org.apache.druid.server.coordinator.LoadQueuePeon;
+import org.apache.druid.server.coordinator.SegmentLoader;
+import org.apache.druid.server.coordinator.SegmentStateManager;
 import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
+import java.util.Set;
 import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 /**
+ *
  */
 public class BalanceSegments implements CoordinatorDuty
 {
   protected static final EmittingLogger log = new 
EmittingLogger(BalanceSegments.class);
+  private final SegmentStateManager stateManager;
 
-  protected final DruidCoordinator coordinator;
-
-  protected final Map<String, ConcurrentHashMap<SegmentId, 
BalancerSegmentHolder>> currentlyMovingSegments =
-      new HashMap<>();
-
-  public BalanceSegments(DruidCoordinator coordinator)
+  public BalanceSegments(SegmentStateManager stateManager)
   {
-    this.coordinator = coordinator;
+    this.stateManager = stateManager;
   }
 
-  protected void reduceLifetimes(String tier)
+  /**
+   * Reduces the lifetimes of segments currently being moved in all the tiers.
+   * Raises alerts for segments stuck in queue
+   * Returns the set of tiers that are currently moving some segments and 
won't be
+   * eligible for assigning more balancing moves in this run.
+   */
+  private void reduceLifetimesAndAlert(int maxLifetime)

Review Comment:
   ## Useless parameter
   
   The parameter 'maxLifetime' is never used.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4746)



##########
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java:
##########
@@ -439,18 +426,21 @@
         // This forcing code should be revised
         // when/if the autocompaction code policy to decide which segments to 
compact changes
         if (dropExisting == null || !dropExisting) {
-          if (segmentsToCompact.stream().allMatch(dataSegment -> 
dataSegment.isTombstone())) {
+          if (segmentsToCompact.stream().allMatch(DataSegment::isTombstone)) {
             dropExisting = true;
-            LOG.info("Forcing dropExisting to %s since all segments to compact 
are tombstones", dropExisting);
+            LOG.info("Forcing dropExisting to true since all segments to 
compact are tombstones.");
           }
         }
 
-        // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
             "coordinator-issued",
             segmentsToCompact,
             config.getTaskPriority(),
-            
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), 
config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
+            ClientCompactionTaskQueryTuningConfig.from(
+                config.getTuningConfig(),
+                config.getMaxRowsPerSegment(),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DataSourceCompactionConfig.getMaxRowsPerSegment](1) should be 
avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4366)



##########
server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java:
##########
@@ -358,265 +257,172 @@
   @Test
   public void testMoveToDecommissioningServer()
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    final ServerHolder serverHolder1 = createHolder(server1, false, 
allSegments);
+    final ServerHolder serverHolder2 = createHolder(server2, true);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
-            .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator())
-            .anyTimes();
+    expectPickLoadingSegmentsAndReturnEmpty(strategy, 2);
+    expectPickLoadedSegmentsAndReturn(
+        strategy, 2,
+        new BalancerSegmentHolder(serverHolder1, segment1)
+    );
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject())).andAnswer(() -> {
       List<ServerHolder> holders = (List<ServerHolder>) 
EasyMock.getCurrentArguments()[1];
       return holders.get(0);
     }).anyTimes();
     EasyMock.replay(strategy);
 
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2),
-        ImmutableList.of(false, true)
-    )
-        .withBalancerStrategy(strategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    params = new BalanceSegments(stateManager).run(params);
     EasyMock.verify(strategy);
-    Assert.assertEquals(0, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+    Assert.assertEquals(0, 
params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED, "normal"));
   }
 
   @Test
   public void testMoveFromDecommissioningServer()
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    final ServerHolder holder1 = createHolder(server1, allSegments);
+    final ServerHolder holder2 = createHolder(server2);
 
-    ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
-            .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator())
-            .once();
+    expectPickLoadingSegmentsAndReturnEmpty(strategy, 1);
+    expectPickLoadedSegmentsAndReturn(
+        strategy, 1,
+        new BalancerSegmentHolder(holder1, segment1)
+    );
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject()))
             .andReturn(holder2)
             .once();
     EasyMock.replay(strategy);
 
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2),
-        ImmutableList.of(true, false)
-    )
+    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(holder1, holder2)
         
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build())
         .withBalancerStrategy(strategy)
         .withBroadcastDatasources(broadcastDatasources)
         .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    params = new BalanceSegments(stateManager).run(params);
     EasyMock.verify(strategy);
-    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertEquals(0, peon1.getNumberOfSegmentsInQueue());
-    Assert.assertEquals(1, peon2.getNumberOfSegmentsInQueue());
+    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED, "normal"));
+    Assert.assertEquals(0, holder1.getPeon().getNumberOfSegmentsToLoad());
+    Assert.assertEquals(1, holder2.getPeon().getNumberOfSegmentsToLoad());
   }
 
   @Test
   public void testMoveMaxLoadQueueServerBalancer()
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
-    BalancerStrategy predefinedPickOrderStrategy = new 
PredefinedPickOrderBalancerStrategy(
-        balancerStrategy,
-        ImmutableList.of(
-            new BalancerSegmentHolder(druidServer1, segment1),
-            new BalancerSegmentHolder(druidServer1, segment2),
-            new BalancerSegmentHolder(druidServer1, segment3),
-            new BalancerSegmentHolder(druidServer1, segment4)
+    final int maxSegmentsInQueue = 1;
+    final ServerHolder holder1 = createHolder(server1, maxSegmentsInQueue, 
allSegments);
+    final ServerHolder holder2 = createHolder(server2, maxSegmentsInQueue);
+
+    final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig
+        .builder()
+        .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
+        .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInQueue)
+        .build();
+    DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
+        .newBuilder()
+        .withDruidCluster(
+            DruidClusterBuilder.newBuilder().addTier("normal", holder1, 
holder2).build()
         )
-    );
-
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2)
-    )
-        .withBalancerStrategy(predefinedPickOrderStrategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig
-                .builder()
-                .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
-                .withMaxSegmentsInNodeLoadingQueue(1)
-                .build()
+        .withLoadManagementPeons(
+            ImmutableMap.of(server1.getName(), holder1.getPeon(), 
server2.getName(), holder2.getPeon())
         )
+        .withUsedSegmentsInTest(allSegments)
+        .withBalancerStrategy(balancerStrategy)
+        .withBroadcastDatasources(broadcastDatasources)
+        .withDynamicConfigs(dynamicConfig)
+        .withReplicationManager(createReplicationThrottler(dynamicConfig))
         .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    params = new BalanceSegments(stateManager).run(params);
 
     // max to move is 5, all segments on server 1, but only expect to move 1 
to server 2 since max node load queue is 1
-    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-  }
-
-  @Test
-  public void testMoveSameSegmentTwice()
-  {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
-    BalancerStrategy predefinedPickOrderStrategy = new 
PredefinedPickOrderBalancerStrategy(
-        balancerStrategy,
-        ImmutableList.of(
-            new BalancerSegmentHolder(druidServer1, segment1)
-        )
-    );
-
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2)
-    )
-        .withBalancerStrategy(predefinedPickOrderStrategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
-                2
-            ).build()
-        )
-        .build();
-
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED, "normal"));
   }
 
   @Test
   public void testRun1()
   {
     // Mock some servers of different usages
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
     DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2)
+        createHolder(server1, allSegments),
+        createHolder(server2)
     ).build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", 
"normal") > 0);
+    params = new BalanceSegments(stateManager).run(params);
+    
Assert.assertTrue(params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED,
 "normal") > 0);
   }
 
   @Test
   public void testRun2()
   {
     // Mock some servers of different usages
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-    mockDruidServer(druidServer3, "3", "normal", 0L, 100L, 
Collections.emptyList());
-    mockDruidServer(druidServer4, "4", "normal", 0L, 100L, 
Collections.emptyList());
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
-    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(druidServers, peons).build();
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2),
+        createHolder(server3),
+        createHolder(server4)
+    ).build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", 
"normal") > 0);
+    params = new BalanceSegments(stateManager).run(params);
+    
Assert.assertTrue(params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED,
 "normal") > 0);
   }
 
-  /**
-   * Testing that the dynamic coordinator config value, 
percentOfSegmentsToConsiderPerMove, is honored when calling
-   * out to pickSegmentToMove. This config limits the number of segments that 
are considered when looking for a segment
-   * to move.
-   */
   @Test
-  public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove()
+  public void testThatMaxSegmentsToMoveIsHonored()
   {
-    mockDruidServer(druidServer1, "1", "normal", 50L, 100L, 
Arrays.asList(segment1, segment2));
-    mockDruidServer(druidServer2, "2", "normal", 30L, 100L, 
Arrays.asList(segment3, segment4));
-    mockDruidServer(druidServer3, "3", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    // Move from non-decomissioning servers
+    final ServerHolder holder1 = createHolder(server1, segment1, segment2);
+    final ServerHolder holder2 = createHolder(server2, segment3, segment4);
+    final ServerHolder holder3 = createHolder(server3);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-
-    // Move from non-decomissioning servers
-    EasyMock.expect(
-        strategy.pickSegmentsToMove(
-            ImmutableList.of(
-                new ServerHolder(druidServer3, peon3, false),
-                new ServerHolder(druidServer2, peon2, false),
-                new ServerHolder(druidServer1, peon1, false)
-            ),
-            broadcastDatasources,
-            40.0
-        )
-    )
-            .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer2, segment3)).iterator());
+    expectPickLoadingSegmentsAndReturnEmpty(strategy, 1);
+    expectPickLoadedSegmentsAndReturn(
+        strategy,
+        1,
+        new BalancerSegmentHolder(holder2, segment3)
+    );
 
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject()))
-            .andReturn(new ServerHolder(druidServer3, peon3))
+            .andReturn(holder3)
             .anyTimes();
     EasyMock.replay(strategy);
 
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2, druidServer3),
-        ImmutableList.of(peon1, peon2, peon3),
-        ImmutableList.of(false, false, false)
-    )
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig.builder()
-                                    .withMaxSegmentsToMove(1)
-                                    .withUseBatchedSegmentSampler(false)
-                                    .withPercentOfSegmentsToConsiderPerMove(40)
-                                    .build()
-        )
-        .withBalancerStrategy(strategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .build();
-
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, holder3)
+            .withDynamicConfigs(
+                CoordinatorDynamicConfig.builder()
+                                        .withMaxSegmentsToMove(1)
+                                        .withUseBatchedSegmentSampler(true)
+                                        
.withPercentOfSegmentsToConsiderPerMove(40)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Builder.withPercentOfSegmentsToConsiderPerMove](1) should be 
avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4744)



##########
server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java:
##########
@@ -358,265 +257,172 @@
   @Test
   public void testMoveToDecommissioningServer()
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    final ServerHolder serverHolder1 = createHolder(server1, false, 
allSegments);
+    final ServerHolder serverHolder2 = createHolder(server2, true);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
-            .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator())
-            .anyTimes();
+    expectPickLoadingSegmentsAndReturnEmpty(strategy, 2);
+    expectPickLoadedSegmentsAndReturn(
+        strategy, 2,
+        new BalancerSegmentHolder(serverHolder1, segment1)
+    );
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject())).andAnswer(() -> {
       List<ServerHolder> holders = (List<ServerHolder>) 
EasyMock.getCurrentArguments()[1];
       return holders.get(0);
     }).anyTimes();
     EasyMock.replay(strategy);
 
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2),
-        ImmutableList.of(false, true)
-    )
-        .withBalancerStrategy(strategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    params = new BalanceSegments(stateManager).run(params);
     EasyMock.verify(strategy);
-    Assert.assertEquals(0, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+    Assert.assertEquals(0, 
params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED, "normal"));
   }
 
   @Test
   public void testMoveFromDecommissioningServer()
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    final ServerHolder holder1 = createHolder(server1, allSegments);
+    final ServerHolder holder2 = createHolder(server2);
 
-    ServerHolder holder2 = new ServerHolder(druidServer2, peon2, false);
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-    EasyMock.expect(strategy.pickSegmentsToMove(EasyMock.anyObject(), 
EasyMock.anyObject(), EasyMock.anyInt()))
-            .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer1, segment1)).iterator())
-            .once();
+    expectPickLoadingSegmentsAndReturnEmpty(strategy, 1);
+    expectPickLoadedSegmentsAndReturn(
+        strategy, 1,
+        new BalancerSegmentHolder(holder1, segment1)
+    );
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject()))
             .andReturn(holder2)
             .once();
     EasyMock.replay(strategy);
 
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2),
-        ImmutableList.of(true, false)
-    )
+    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(holder1, holder2)
         
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(1).build())
         .withBalancerStrategy(strategy)
         .withBroadcastDatasources(broadcastDatasources)
         .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    params = new BalanceSegments(stateManager).run(params);
     EasyMock.verify(strategy);
-    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertEquals(0, peon1.getNumberOfSegmentsInQueue());
-    Assert.assertEquals(1, peon2.getNumberOfSegmentsInQueue());
+    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED, "normal"));
+    Assert.assertEquals(0, holder1.getPeon().getNumberOfSegmentsToLoad());
+    Assert.assertEquals(1, holder2.getPeon().getNumberOfSegmentsToLoad());
   }
 
   @Test
   public void testMoveMaxLoadQueueServerBalancer()
   {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
-    BalancerStrategy predefinedPickOrderStrategy = new 
PredefinedPickOrderBalancerStrategy(
-        balancerStrategy,
-        ImmutableList.of(
-            new BalancerSegmentHolder(druidServer1, segment1),
-            new BalancerSegmentHolder(druidServer1, segment2),
-            new BalancerSegmentHolder(druidServer1, segment3),
-            new BalancerSegmentHolder(druidServer1, segment4)
+    final int maxSegmentsInQueue = 1;
+    final ServerHolder holder1 = createHolder(server1, maxSegmentsInQueue, 
allSegments);
+    final ServerHolder holder2 = createHolder(server2, maxSegmentsInQueue);
+
+    final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig
+        .builder()
+        .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
+        .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInQueue)
+        .build();
+    DruidCoordinatorRuntimeParams params = CoordinatorRuntimeParamsTestHelpers
+        .newBuilder()
+        .withDruidCluster(
+            DruidClusterBuilder.newBuilder().addTier("normal", holder1, 
holder2).build()
         )
-    );
-
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2)
-    )
-        .withBalancerStrategy(predefinedPickOrderStrategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig
-                .builder()
-                .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
-                .withMaxSegmentsInNodeLoadingQueue(1)
-                .build()
+        .withLoadManagementPeons(
+            ImmutableMap.of(server1.getName(), holder1.getPeon(), 
server2.getName(), holder2.getPeon())
         )
+        .withUsedSegmentsInTest(allSegments)
+        .withBalancerStrategy(balancerStrategy)
+        .withBroadcastDatasources(broadcastDatasources)
+        .withDynamicConfigs(dynamicConfig)
+        .withReplicationManager(createReplicationThrottler(dynamicConfig))
         .build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    params = new BalanceSegments(stateManager).run(params);
 
     // max to move is 5, all segments on server 1, but only expect to move 1 
to server 2 since max node load queue is 1
-    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-  }
-
-  @Test
-  public void testMoveSameSegmentTwice()
-  {
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
-    BalancerStrategy predefinedPickOrderStrategy = new 
PredefinedPickOrderBalancerStrategy(
-        balancerStrategy,
-        ImmutableList.of(
-            new BalancerSegmentHolder(druidServer1, segment1)
-        )
-    );
-
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2)
-    )
-        .withBalancerStrategy(predefinedPickOrderStrategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
-                2
-            ).build()
-        )
-        .build();
-
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+    Assert.assertEquals(1, 
params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED, "normal"));
   }
 
   @Test
   public void testRun1()
   {
     // Mock some servers of different usages
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer3);
-    EasyMock.replay(druidServer4);
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
     DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2),
-        ImmutableList.of(peon1, peon2)
+        createHolder(server1, allSegments),
+        createHolder(server2)
     ).build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", 
"normal") > 0);
+    params = new BalanceSegments(stateManager).run(params);
+    
Assert.assertTrue(params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED,
 "normal") > 0);
   }
 
   @Test
   public void testRun2()
   {
     // Mock some servers of different usages
-    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
-    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, 
Collections.emptyList());
-    mockDruidServer(druidServer3, "3", "normal", 0L, 100L, 
Collections.emptyList());
-    mockDruidServer(druidServer4, "4", "normal", 0L, 100L, 
Collections.emptyList());
-
-    // Mock stuff that the coordinator needs
-    mockCoordinator(coordinator);
-
-    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(druidServers, peons).build();
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2),
+        createHolder(server3),
+        createHolder(server4)
+    ).build();
 
-    params = new BalanceSegmentsTester(coordinator).run(params);
-    Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", 
"normal") > 0);
+    params = new BalanceSegments(stateManager).run(params);
+    
Assert.assertTrue(params.getCoordinatorStats().getTieredStat(Stats.Segments.MOVED,
 "normal") > 0);
   }
 
-  /**
-   * Testing that the dynamic coordinator config value, 
percentOfSegmentsToConsiderPerMove, is honored when calling
-   * out to pickSegmentToMove. This config limits the number of segments that 
are considered when looking for a segment
-   * to move.
-   */
   @Test
-  public void testThatDynamicConfigIsHonoredWhenPickingSegmentToMove()
+  public void testThatMaxSegmentsToMoveIsHonored()
   {
-    mockDruidServer(druidServer1, "1", "normal", 50L, 100L, 
Arrays.asList(segment1, segment2));
-    mockDruidServer(druidServer2, "2", "normal", 30L, 100L, 
Arrays.asList(segment3, segment4));
-    mockDruidServer(druidServer3, "3", "normal", 0L, 100L, 
Collections.emptyList());
-
-    EasyMock.replay(druidServer4);
-
-    mockCoordinator(coordinator);
+    // Move from non-decomissioning servers
+    final ServerHolder holder1 = createHolder(server1, segment1, segment2);
+    final ServerHolder holder2 = createHolder(server2, segment3, segment4);
+    final ServerHolder holder3 = createHolder(server3);
 
     BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
-
-    // Move from non-decomissioning servers
-    EasyMock.expect(
-        strategy.pickSegmentsToMove(
-            ImmutableList.of(
-                new ServerHolder(druidServer3, peon3, false),
-                new ServerHolder(druidServer2, peon2, false),
-                new ServerHolder(druidServer1, peon1, false)
-            ),
-            broadcastDatasources,
-            40.0
-        )
-    )
-            .andReturn(ImmutableList.of(new 
BalancerSegmentHolder(druidServer2, segment3)).iterator());
+    expectPickLoadingSegmentsAndReturnEmpty(strategy, 1);
+    expectPickLoadedSegmentsAndReturn(
+        strategy,
+        1,
+        new BalancerSegmentHolder(holder2, segment3)
+    );
 
     EasyMock.expect(strategy.findNewSegmentHomeBalancer(EasyMock.anyObject(), 
EasyMock.anyObject()))
-            .andReturn(new ServerHolder(druidServer3, peon3))
+            .andReturn(holder3)
             .anyTimes();
     EasyMock.replay(strategy);
 
-    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
-        ImmutableList.of(druidServer1, druidServer2, druidServer3),
-        ImmutableList.of(peon1, peon2, peon3),
-        ImmutableList.of(false, false, false)
-    )
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig.builder()
-                                    .withMaxSegmentsToMove(1)
-                                    .withUseBatchedSegmentSampler(false)
-                                    .withPercentOfSegmentsToConsiderPerMove(40)
-                                    .build()
-        )
-        .withBalancerStrategy(strategy)
-        .withBroadcastDatasources(broadcastDatasources)
-        .build();
-
-    params = new BalanceSegmentsTester(coordinator).run(params);
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, holder3)
+            .withDynamicConfigs(
+                CoordinatorDynamicConfig.builder()
+                                        .withMaxSegmentsToMove(1)
+                                        .withUseBatchedSegmentSampler(true)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Builder.withUseBatchedSegmentSampler](1) should be avoided because 
it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/4745)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to