github-code-scanning[bot] commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1232985512


##########
server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+/**
+ * Contains recomputed configs from {@link CoordinatorDynamicConfig} based on
+ * whether {@code smartSegmentLoading} is enabled or not.
+ */
+public class SegmentLoadingConfig
+{
+  private static final Logger log = new Logger(SegmentLoadingConfig.class);
+
+  private final int maxSegmentsInLoadQueue;
+  private final int replicationThrottleLimit;
+  private final int maxReplicaAssignmentsInRun;
+  private final int maxLifetimeInLoadQueue;
+
+  private final int maxSegmentsToMove;
+  private final int percentDecommSegmentsToMove;
+
+  private final boolean useRoundRobinSegmentAssignment;
+  private final boolean emitBalancingStats;
+
+  public SegmentLoadingConfig(CoordinatorDynamicConfig dynamicConfig, int 
numUsedSegments)
+  {
+    if (dynamicConfig.isSmartSegmentLoading()) {
+      // Compute recommended values
+      this.maxSegmentsInLoadQueue = 0;
+      this.useRoundRobinSegmentAssignment = true;
+      this.emitBalancingStats = false;
+      this.maxLifetimeInLoadQueue = dynamicConfig.getReplicantLifetime();
+      this.maxReplicaAssignmentsInRun = Integer.MAX_VALUE;
+      this.percentDecommSegmentsToMove = 100;
+
+      // Impose a lower bound on both replicationThrottleLimit and 
maxSegmentsToMove
+      final int throttlePercentage = 2;
+      final int replicationThrottleLimit = Math.max(100, numUsedSegments * 
throttlePercentage / 100);
+
+      // Impose an upper bound on maxSegmentsToMove to ensure that coordinator
+      // run times are bounded. This limit can be relaxed as performance of
+      // the CostBalancerStrategy.computeCost() is improved.
+      final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
+
+      log.info(
+          "Smart segment loading is enabled. Recomputed 
replicationThrottleLimit"
+          + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%d].",
+          replicationThrottleLimit, throttlePercentage, numUsedSegments, 
maxSegmentsToMove
+      );
+
+      this.replicationThrottleLimit = replicationThrottleLimit;
+      this.maxSegmentsToMove = maxSegmentsToMove;
+    } else {
+      // Use the configured values
+      this.maxSegmentsInLoadQueue = 
dynamicConfig.getMaxSegmentsInNodeLoadingQueue();
+      this.replicationThrottleLimit = 
dynamicConfig.getReplicationThrottleLimit();
+      this.maxLifetimeInLoadQueue = dynamicConfig.getReplicantLifetime();
+      this.maxSegmentsToMove = dynamicConfig.getMaxSegmentsToMove();
+      this.useRoundRobinSegmentAssignment = 
dynamicConfig.isUseRoundRobinSegmentAssignment();
+      this.emitBalancingStats = dynamicConfig.emitBalancingStats();
+      this.maxReplicaAssignmentsInRun = 
dynamicConfig.getMaxNonPrimaryReplicantsToLoad();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5111)



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+/**
+ * Contains recomputed configs from {@link CoordinatorDynamicConfig} based on
+ * whether {@code smartSegmentLoading} is enabled or not.
+ */
+public class SegmentLoadingConfig
+{
+  private static final Logger log = new Logger(SegmentLoadingConfig.class);
+
+  private final int maxSegmentsInLoadQueue;
+  private final int replicationThrottleLimit;
+  private final int maxReplicaAssignmentsInRun;
+  private final int maxLifetimeInLoadQueue;
+
+  private final int maxSegmentsToMove;
+  private final int percentDecommSegmentsToMove;
+
+  private final boolean useRoundRobinSegmentAssignment;
+  private final boolean emitBalancingStats;
+
+  public SegmentLoadingConfig(CoordinatorDynamicConfig dynamicConfig, int 
numUsedSegments)
+  {
+    if (dynamicConfig.isSmartSegmentLoading()) {
+      // Compute recommended values
+      this.maxSegmentsInLoadQueue = 0;
+      this.useRoundRobinSegmentAssignment = true;
+      this.emitBalancingStats = false;
+      this.maxLifetimeInLoadQueue = dynamicConfig.getReplicantLifetime();
+      this.maxReplicaAssignmentsInRun = Integer.MAX_VALUE;
+      this.percentDecommSegmentsToMove = 100;
+
+      // Impose a lower bound on both replicationThrottleLimit and 
maxSegmentsToMove
+      final int throttlePercentage = 2;
+      final int replicationThrottleLimit = Math.max(100, numUsedSegments * 
throttlePercentage / 100);
+
+      // Impose an upper bound on maxSegmentsToMove to ensure that coordinator
+      // run times are bounded. This limit can be relaxed as performance of
+      // the CostBalancerStrategy.computeCost() is improved.
+      final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
+
+      log.info(
+          "Smart segment loading is enabled. Recomputed 
replicationThrottleLimit"
+          + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%d].",
+          replicationThrottleLimit, throttlePercentage, numUsedSegments, 
maxSegmentsToMove
+      );
+
+      this.replicationThrottleLimit = replicationThrottleLimit;
+      this.maxSegmentsToMove = maxSegmentsToMove;
+    } else {
+      // Use the configured values
+      this.maxSegmentsInLoadQueue = 
dynamicConfig.getMaxSegmentsInNodeLoadingQueue();
+      this.replicationThrottleLimit = 
dynamicConfig.getReplicationThrottleLimit();
+      this.maxLifetimeInLoadQueue = dynamicConfig.getReplicantLifetime();
+      this.maxSegmentsToMove = dynamicConfig.getMaxSegmentsToMove();
+      this.useRoundRobinSegmentAssignment = 
dynamicConfig.isUseRoundRobinSegmentAssignment();
+      this.emitBalancingStats = dynamicConfig.emitBalancingStats();
+      this.maxReplicaAssignmentsInRun = 
dynamicConfig.getMaxNonPrimaryReplicantsToLoad();
+      this.percentDecommSegmentsToMove = 
dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking 
[CoordinatorDynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove](1) 
should be avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5112)



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
+import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class BalanceSegmentsTest
+{
+  private SegmentLoadQueueManager loadQueueManager;
+
+  private DataSegment segment1;
+  private DataSegment segment2;
+  private DataSegment segment3;
+  private DataSegment segment4;
+  private DataSegment segment5;
+
+  private DataSegment[] allSegments;
+
+  private ListeningExecutorService balancerStrategyExecutor;
+  private BalancerStrategy balancerStrategy;
+  private Set<String> broadcastDatasources;
+
+  private DruidServer server1;
+  private DruidServer server2;
+  private DruidServer server3;
+  private DruidServer server4;
+
+  @Before
+  public void setUp()
+  {
+    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+
+    // Create test segments for multiple datasources
+    final DateTime start1 = DateTimes.of("2012-01-01");
+    final DateTime start2 = DateTimes.of("2012-02-01");
+    final String version = DateTimes.of("2012-03-01").toString();
+
+    segment1 = createHourlySegment("datasource1", start1, version);
+    segment2 = createHourlySegment("datasource1", start2, version);
+    segment3 = createHourlySegment("datasource2", start1, version);
+    segment4 = createHourlySegment("datasource2", start2, version);
+    segment5 = createHourlySegment("datasourceBroadcast", start2, version);
+    allSegments = new DataSegment[]{segment1, segment2, segment3, segment4, 
segment5};
+
+    server1 = new DruidServer("server1", "server1", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server2 = new DruidServer("server2", "server2", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server3 = new DruidServer("server3", "server3", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server4 = new DruidServer("server4", "server4", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+
+    balancerStrategyExecutor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"BalanceSegmentsTest-%d"));
+    balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+
+    broadcastDatasources = Collections.singleton("datasourceBroadcast");
+  }
+
+  @After
+  public void tearDown()
+  {
+    balancerStrategyExecutor.shutdownNow();
+  }
+
+  @Test
+  public void testMoveToEmptyServerBalancer()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2, segment3, segment4);
+    final ServerHolder serverHolder2 = createHolder(server2);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
+            .withBalancerStrategy(balancerStrategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(2L, totalMoved);
+  }
+
+  /**
+   * Server 1 has 2 segments.
+   * Server 2 (decommissioning) has 2 segments.
+   * Server 3 is empty.
+   * Decommissioning percent is 60.
+   * Max segments to move is 3.
+   * 2 (of 2) segments should be moved from Server 2 and 1 (of 2) from Server 
1.
+   */
+  @Test
+  public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder serverHolder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder serverHolder3 = createHolder(server3, false);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, serverHolder3);
+    EasyMock.replay(strategy);
+
+    // ceil(3 * 0.6) = 2 segments from decommissioning servers
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                .withMaxSegmentsToMove(3)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
+                                .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+            .withDynamicConfigs(dynamicConfig)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .withSegmentAssignerUsing(loadQueueManager)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    EasyMock.verify(strategy);
+
+    // 2 segments are moved from the decommissioning server and 1 from the 
active server
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource1"));
+    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource2"));
+    Set<DataSegment> segmentsMoved = 
serverHolder3.getPeon().getSegmentsToLoad();
+    Assert.assertTrue(segmentsMoved.contains(segment3));
+    Assert.assertTrue(segmentsMoved.contains(segment4));
+    Assert.assertTrue(segmentsMoved.contains(segment1) || 
segmentsMoved.contains(segment2));
+  }
+
+  @Test
+  public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder holder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder holder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder holder3 = createHolder(server3, false);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(0)
+                                .withMaxSegmentsToMove(1).build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, 
holder3).withDynamicConfigs(dynamicConfig).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // Verify that either segment1 or segment2 is chosen for move
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", segment1.getDataSource()));
+    DataSegment movingSegment = 
holder3.getPeon().getSegmentsToLoad().iterator().next();
+    Assert.assertEquals(segment1.getDataSource(), 
movingSegment.getDataSource());
+  }
+
+  @Test
+  public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder holder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder holder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder holder3 = createHolder(server3, false);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(100)
+                                .withMaxSegmentsToMove(1).build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, 
holder3).withDynamicConfigs(dynamicConfig).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // Verify that either segment3 or segment4 is chosen for move
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", segment3.getDataSource()));
+    DataSegment movingSegment = 
holder3.getPeon().getSegmentsToLoad().iterator().next();
+    Assert.assertEquals(segment3.getDataSource(), 
movingSegment.getDataSource());
+  }
+
+  /**
+   * Should balance segments as usual (ignoring percent) with empty 
decommissioningNodes.
+   */
+  @Test
+  public void 
testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2);
+    final ServerHolder serverHolder2 = createHolder(server2, segment3, 
segment4);
+    final ServerHolder serverHolder3 = createHolder(server3);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, serverHolder3);
+    EasyMock.replay(strategy);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                .withMaxSegmentsToMove(3)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
+                                .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+            .withDynamicConfigs(dynamicConfig)
+            .withBalancerStrategy(strategy)
+            .withSegmentAssignerUsing(loadQueueManager)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(3L, totalMoved);
+    Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size());
+  }
+
+  /**
+   * Shouldn't move segments to a decommissioning server.
+   */
+  @Test
+  public void testMoveToDecommissioningServer()
+  {
+    final ServerHolder activeServer = createHolder(server1, false, 
allSegments);
+    final ServerHolder decommissioningServer = createHolder(server2, true);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, decommissioningServer);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(activeServer, decommissioningServer)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED));
+  }
+
+  @Test
+  public void testMoveFromDecommissioningServer()
+  {
+    final ServerHolder decommissioningServer = createHolder(server1, true, 
allSegments);
+    final ServerHolder activeServer = createHolder(server2);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, activeServer);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(decommissioningServer, activeServer)
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig.builder()
+                                    .withSmartSegmentLoading(false)
+                                    .withMaxSegmentsToMove(3).build()
+        )
+        .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
+        .build();
+
+    runBalancer(params);
+    EasyMock.verify(strategy);
+    Assert.assertEquals(0, 
decommissioningServer.getPeon().getSegmentsToLoad().size());
+    Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size());
+  }
+
+  @Test
+  public void testMoveMaxLoadQueueServerBalancer()
+  {
+    final int maxSegmentsInQueue = 1;
+    final ServerHolder holder1 = createHolder(server1, maxSegmentsInQueue, 
false, allSegments);
+    final ServerHolder holder2 = createHolder(server2, maxSegmentsInQueue, 
false);
+
+    final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig
+        .builder()
+        .withSmartSegmentLoading(false)
+        .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInQueue)
+        .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2)
+            .withDynamicConfigs(dynamicConfig)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // max to move is 5, all segments on server 1, but only expect to move 1 
to server 2 since max node load queue is 1
+    Assert.assertEquals(maxSegmentsInQueue, 
stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1"));
+  }
+
+  @Test
+  public void testRun1()
+  {
+    // Mock some servers of different usages
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2)
+    ).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertTrue(stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1") > 0);
+  }
+
+  @Test
+  public void testRun2()
+  {
+    // Mock some servers of different usages
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2),
+        createHolder(server3),
+        createHolder(server4)
+    ).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertTrue(stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1") > 0);
+  }
+
+  @Test
+  public void testMaxSegmentsToMoveIsHonored()
+  {
+    // Move from non-decomissioning servers
+    final ServerHolder holder1 = createHolder(server1, segment1, segment2);
+    final ServerHolder holder2 = createHolder(server2, segment3, segment4);
+    final ServerHolder holder3 = createHolder(server3);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, holder3);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, holder3)
+            .withDynamicConfigs(
+                CoordinatorDynamicConfig.builder()
+                                        .withSmartSegmentLoading(false)
+                                        .withMaxSegmentsToMove(1)
+                                        .withUseBatchedSegmentSampler(true)
+                                        
.withPercentOfSegmentsToConsiderPerMove(40)
+                                        .build()
+            )
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(1L, totalMoved);
+    Assert.assertEquals(1, holder3.getPeon().getSegmentsToLoad().size());
+  }
+
+  @Test
+  public void testUseBatchedSegmentSampler()
+  {
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2),
+        createHolder(server3),
+        createHolder(server4)
+    )
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig.builder()
+                                    .withSmartSegmentLoading(false)
+                                    .withMaxSegmentsToMove(2)
+                                    .withUseBatchedSegmentSampler(true)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Builder.withUseBatchedSegmentSampler](1) should be avoided because 
it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5115)



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
+import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class BalanceSegmentsTest
+{
+  private SegmentLoadQueueManager loadQueueManager;
+
+  private DataSegment segment1;
+  private DataSegment segment2;
+  private DataSegment segment3;
+  private DataSegment segment4;
+  private DataSegment segment5;
+
+  private DataSegment[] allSegments;
+
+  private ListeningExecutorService balancerStrategyExecutor;
+  private BalancerStrategy balancerStrategy;
+  private Set<String> broadcastDatasources;
+
+  private DruidServer server1;
+  private DruidServer server2;
+  private DruidServer server3;
+  private DruidServer server4;
+
+  @Before
+  public void setUp()
+  {
+    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+
+    // Create test segments for multiple datasources
+    final DateTime start1 = DateTimes.of("2012-01-01");
+    final DateTime start2 = DateTimes.of("2012-02-01");
+    final String version = DateTimes.of("2012-03-01").toString();
+
+    segment1 = createHourlySegment("datasource1", start1, version);
+    segment2 = createHourlySegment("datasource1", start2, version);
+    segment3 = createHourlySegment("datasource2", start1, version);
+    segment4 = createHourlySegment("datasource2", start2, version);
+    segment5 = createHourlySegment("datasourceBroadcast", start2, version);
+    allSegments = new DataSegment[]{segment1, segment2, segment3, segment4, 
segment5};
+
+    server1 = new DruidServer("server1", "server1", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server2 = new DruidServer("server2", "server2", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server3 = new DruidServer("server3", "server3", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server4 = new DruidServer("server4", "server4", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+
+    balancerStrategyExecutor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"BalanceSegmentsTest-%d"));
+    balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+
+    broadcastDatasources = Collections.singleton("datasourceBroadcast");
+  }
+
+  @After
+  public void tearDown()
+  {
+    balancerStrategyExecutor.shutdownNow();
+  }
+
+  @Test
+  public void testMoveToEmptyServerBalancer()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2, segment3, segment4);
+    final ServerHolder serverHolder2 = createHolder(server2);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
+            .withBalancerStrategy(balancerStrategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(2L, totalMoved);
+  }
+
+  /**
+   * Server 1 has 2 segments.
+   * Server 2 (decommissioning) has 2 segments.
+   * Server 3 is empty.
+   * Decommissioning percent is 60.
+   * Max segments to move is 3.
+   * 2 (of 2) segments should be moved from Server 2 and 1 (of 2) from Server 
1.
+   */
+  @Test
+  public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder serverHolder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder serverHolder3 = createHolder(server3, false);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, serverHolder3);
+    EasyMock.replay(strategy);
+
+    // ceil(3 * 0.6) = 2 segments from decommissioning servers
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                .withMaxSegmentsToMove(3)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
+                                .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+            .withDynamicConfigs(dynamicConfig)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .withSegmentAssignerUsing(loadQueueManager)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    EasyMock.verify(strategy);
+
+    // 2 segments are moved from the decommissioning server and 1 from the 
active server
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource1"));
+    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource2"));
+    Set<DataSegment> segmentsMoved = 
serverHolder3.getPeon().getSegmentsToLoad();
+    Assert.assertTrue(segmentsMoved.contains(segment3));
+    Assert.assertTrue(segmentsMoved.contains(segment4));
+    Assert.assertTrue(segmentsMoved.contains(segment1) || 
segmentsMoved.contains(segment2));
+  }
+
+  @Test
+  public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder holder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder holder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder holder3 = createHolder(server3, false);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(0)
+                                .withMaxSegmentsToMove(1).build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, 
holder3).withDynamicConfigs(dynamicConfig).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // Verify that either segment1 or segment2 is chosen for move
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", segment1.getDataSource()));
+    DataSegment movingSegment = 
holder3.getPeon().getSegmentsToLoad().iterator().next();
+    Assert.assertEquals(segment1.getDataSource(), 
movingSegment.getDataSource());
+  }
+
+  @Test
+  public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder holder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder holder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder holder3 = createHolder(server3, false);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(100)
+                                .withMaxSegmentsToMove(1).build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, 
holder3).withDynamicConfigs(dynamicConfig).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // Verify that either segment3 or segment4 is chosen for move
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", segment3.getDataSource()));
+    DataSegment movingSegment = 
holder3.getPeon().getSegmentsToLoad().iterator().next();
+    Assert.assertEquals(segment3.getDataSource(), 
movingSegment.getDataSource());
+  }
+
+  /**
+   * Should balance segments as usual (ignoring percent) with empty 
decommissioningNodes.
+   */
+  @Test
+  public void 
testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2);
+    final ServerHolder serverHolder2 = createHolder(server2, segment3, 
segment4);
+    final ServerHolder serverHolder3 = createHolder(server3);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, serverHolder3);
+    EasyMock.replay(strategy);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                .withMaxSegmentsToMove(3)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
+                                .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+            .withDynamicConfigs(dynamicConfig)
+            .withBalancerStrategy(strategy)
+            .withSegmentAssignerUsing(loadQueueManager)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(3L, totalMoved);
+    Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size());
+  }
+
+  /**
+   * Shouldn't move segments to a decommissioning server.
+   */
+  @Test
+  public void testMoveToDecommissioningServer()
+  {
+    final ServerHolder activeServer = createHolder(server1, false, 
allSegments);
+    final ServerHolder decommissioningServer = createHolder(server2, true);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, decommissioningServer);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(activeServer, decommissioningServer)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED));
+  }
+
+  @Test
+  public void testMoveFromDecommissioningServer()
+  {
+    final ServerHolder decommissioningServer = createHolder(server1, true, 
allSegments);
+    final ServerHolder activeServer = createHolder(server2);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, activeServer);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(decommissioningServer, activeServer)
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig.builder()
+                                    .withSmartSegmentLoading(false)
+                                    .withMaxSegmentsToMove(3).build()
+        )
+        .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
+        .build();
+
+    runBalancer(params);
+    EasyMock.verify(strategy);
+    Assert.assertEquals(0, 
decommissioningServer.getPeon().getSegmentsToLoad().size());
+    Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size());
+  }
+
+  @Test
+  public void testMoveMaxLoadQueueServerBalancer()
+  {
+    final int maxSegmentsInQueue = 1;
+    final ServerHolder holder1 = createHolder(server1, maxSegmentsInQueue, 
false, allSegments);
+    final ServerHolder holder2 = createHolder(server2, maxSegmentsInQueue, 
false);
+
+    final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig
+        .builder()
+        .withSmartSegmentLoading(false)
+        .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInQueue)
+        .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2)
+            .withDynamicConfigs(dynamicConfig)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // max to move is 5, all segments on server 1, but only expect to move 1 
to server 2 since max node load queue is 1
+    Assert.assertEquals(maxSegmentsInQueue, 
stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1"));
+  }
+
+  @Test
+  public void testRun1()
+  {
+    // Mock some servers of different usages
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2)
+    ).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertTrue(stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1") > 0);
+  }
+
+  @Test
+  public void testRun2()
+  {
+    // Mock some servers of different usages
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2),
+        createHolder(server3),
+        createHolder(server4)
+    ).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertTrue(stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1") > 0);
+  }
+
+  @Test
+  public void testMaxSegmentsToMoveIsHonored()
+  {
+    // Move from non-decomissioning servers
+    final ServerHolder holder1 = createHolder(server1, segment1, segment2);
+    final ServerHolder holder2 = createHolder(server2, segment3, segment4);
+    final ServerHolder holder3 = createHolder(server3);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, holder3);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, holder3)
+            .withDynamicConfigs(
+                CoordinatorDynamicConfig.builder()
+                                        .withSmartSegmentLoading(false)
+                                        .withMaxSegmentsToMove(1)
+                                        .withUseBatchedSegmentSampler(true)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Builder.withUseBatchedSegmentSampler](1) should be avoided because 
it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5114)



##########
server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java:
##########
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.duty;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCluster;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
+import 
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
+import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.easymock.EasyMock;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Set;
+
+public class BalanceSegmentsTest
+{
+  private SegmentLoadQueueManager loadQueueManager;
+
+  private DataSegment segment1;
+  private DataSegment segment2;
+  private DataSegment segment3;
+  private DataSegment segment4;
+  private DataSegment segment5;
+
+  private DataSegment[] allSegments;
+
+  private ListeningExecutorService balancerStrategyExecutor;
+  private BalancerStrategy balancerStrategy;
+  private Set<String> broadcastDatasources;
+
+  private DruidServer server1;
+  private DruidServer server2;
+  private DruidServer server3;
+  private DruidServer server4;
+
+  @Before
+  public void setUp()
+  {
+    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
+
+    // Create test segments for multiple datasources
+    final DateTime start1 = DateTimes.of("2012-01-01");
+    final DateTime start2 = DateTimes.of("2012-02-01");
+    final String version = DateTimes.of("2012-03-01").toString();
+
+    segment1 = createHourlySegment("datasource1", start1, version);
+    segment2 = createHourlySegment("datasource1", start2, version);
+    segment3 = createHourlySegment("datasource2", start1, version);
+    segment4 = createHourlySegment("datasource2", start2, version);
+    segment5 = createHourlySegment("datasourceBroadcast", start2, version);
+    allSegments = new DataSegment[]{segment1, segment2, segment3, segment4, 
segment5};
+
+    server1 = new DruidServer("server1", "server1", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server2 = new DruidServer("server2", "server2", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server3 = new DruidServer("server3", "server3", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+    server4 = new DruidServer("server4", "server4", null, 100L, 
ServerType.HISTORICAL, "normal", 0);
+
+    balancerStrategyExecutor = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(1, 
"BalanceSegmentsTest-%d"));
+    balancerStrategy = new 
CostBalancerStrategyFactory().createBalancerStrategy(balancerStrategyExecutor);
+
+    broadcastDatasources = Collections.singleton("datasourceBroadcast");
+  }
+
+  @After
+  public void tearDown()
+  {
+    balancerStrategyExecutor.shutdownNow();
+  }
+
+  @Test
+  public void testMoveToEmptyServerBalancer()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2, segment3, segment4);
+    final ServerHolder serverHolder2 = createHolder(server2);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
+            .withBalancerStrategy(balancerStrategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(2L, totalMoved);
+  }
+
+  /**
+   * Server 1 has 2 segments.
+   * Server 2 (decommissioning) has 2 segments.
+   * Server 3 is empty.
+   * Decommissioning percent is 60.
+   * Max segments to move is 3.
+   * 2 (of 2) segments should be moved from Server 2 and 1 (of 2) from Server 
1.
+   */
+  @Test
+  public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder serverHolder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder serverHolder3 = createHolder(server3, false);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, serverHolder3);
+    EasyMock.replay(strategy);
+
+    // ceil(3 * 0.6) = 2 segments from decommissioning servers
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                .withMaxSegmentsToMove(3)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
+                                .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+            .withDynamicConfigs(dynamicConfig)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .withSegmentAssignerUsing(loadQueueManager)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    EasyMock.verify(strategy);
+
+    // 2 segments are moved from the decommissioning server and 1 from the 
active server
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource1"));
+    Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", "datasource2"));
+    Set<DataSegment> segmentsMoved = 
serverHolder3.getPeon().getSegmentsToLoad();
+    Assert.assertTrue(segmentsMoved.contains(segment3));
+    Assert.assertTrue(segmentsMoved.contains(segment4));
+    Assert.assertTrue(segmentsMoved.contains(segment1) || 
segmentsMoved.contains(segment2));
+  }
+
+  @Test
+  public void testZeroDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder holder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder holder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder holder3 = createHolder(server3, false);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(0)
+                                .withMaxSegmentsToMove(1).build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, 
holder3).withDynamicConfigs(dynamicConfig).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // Verify that either segment1 or segment2 is chosen for move
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", segment1.getDataSource()));
+    DataSegment movingSegment = 
holder3.getPeon().getSegmentsToLoad().iterator().next();
+    Assert.assertEquals(segment1.getDataSource(), 
movingSegment.getDataSource());
+  }
+
+  @Test
+  public void testMaxDecommissioningMaxPercentOfMaxSegmentsToMove()
+  {
+    final ServerHolder holder1 = createHolder(server1, false, segment1, 
segment2);
+    final ServerHolder holder2 = createHolder(server2, true, segment3, 
segment4);
+    final ServerHolder holder3 = createHolder(server3, false);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(100)
+                                .withMaxSegmentsToMove(1).build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, 
holder3).withDynamicConfigs(dynamicConfig).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // Verify that either segment3 or segment4 is chosen for move
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED, 
"normal", segment3.getDataSource()));
+    DataSegment movingSegment = 
holder3.getPeon().getSegmentsToLoad().iterator().next();
+    Assert.assertEquals(segment3.getDataSource(), 
movingSegment.getDataSource());
+  }
+
+  /**
+   * Should balance segments as usual (ignoring percent) with empty 
decommissioningNodes.
+   */
+  @Test
+  public void 
testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
+  {
+    final ServerHolder serverHolder1 = createHolder(server1, segment1, 
segment2);
+    final ServerHolder serverHolder2 = createHolder(server2, segment3, 
segment4);
+    final ServerHolder serverHolder3 = createHolder(server3);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, serverHolder3);
+    EasyMock.replay(strategy);
+
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withSmartSegmentLoading(false)
+                                .withMaxSegmentsToMove(3)
+                                
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
+                                .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(serverHolder1, serverHolder2, 
serverHolder3)
+            .withDynamicConfigs(dynamicConfig)
+            .withBalancerStrategy(strategy)
+            .withSegmentAssignerUsing(loadQueueManager)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1")
+                      + stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource2");
+    Assert.assertEquals(3L, totalMoved);
+    Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size());
+  }
+
+  /**
+   * Shouldn't move segments to a decommissioning server.
+   */
+  @Test
+  public void testMoveToDecommissioningServer()
+  {
+    final ServerHolder activeServer = createHolder(server1, false, 
allSegments);
+    final ServerHolder decommissioningServer = createHolder(server2, true);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, decommissioningServer);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(activeServer, decommissioningServer)
+            .withBalancerStrategy(strategy)
+            .withBroadcastDatasources(broadcastDatasources)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    EasyMock.verify(strategy);
+    Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED));
+  }
+
+  @Test
+  public void testMoveFromDecommissioningServer()
+  {
+    final ServerHolder decommissioningServer = createHolder(server1, true, 
allSegments);
+    final ServerHolder activeServer = createHolder(server2);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, activeServer);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params = 
defaultRuntimeParamsBuilder(decommissioningServer, activeServer)
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig.builder()
+                                    .withSmartSegmentLoading(false)
+                                    .withMaxSegmentsToMove(3).build()
+        )
+        .withBalancerStrategy(strategy)
+        .withBroadcastDatasources(broadcastDatasources)
+        .build();
+
+    runBalancer(params);
+    EasyMock.verify(strategy);
+    Assert.assertEquals(0, 
decommissioningServer.getPeon().getSegmentsToLoad().size());
+    Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size());
+  }
+
+  @Test
+  public void testMoveMaxLoadQueueServerBalancer()
+  {
+    final int maxSegmentsInQueue = 1;
+    final ServerHolder holder1 = createHolder(server1, maxSegmentsInQueue, 
false, allSegments);
+    final ServerHolder holder2 = createHolder(server2, maxSegmentsInQueue, 
false);
+
+    final CoordinatorDynamicConfig dynamicConfig = CoordinatorDynamicConfig
+        .builder()
+        .withSmartSegmentLoading(false)
+        .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInQueue)
+        .build();
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2)
+            .withDynamicConfigs(dynamicConfig)
+            .build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+
+    // max to move is 5, all segments on server 1, but only expect to move 1 
to server 2 since max node load queue is 1
+    Assert.assertEquals(maxSegmentsInQueue, 
stats.getSegmentStat(Stats.Segments.MOVED, "normal", "datasource1"));
+  }
+
+  @Test
+  public void testRun1()
+  {
+    // Mock some servers of different usages
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2)
+    ).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertTrue(stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1") > 0);
+  }
+
+  @Test
+  public void testRun2()
+  {
+    // Mock some servers of different usages
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        createHolder(server1, allSegments),
+        createHolder(server2),
+        createHolder(server3),
+        createHolder(server4)
+    ).build();
+
+    CoordinatorRunStats stats = runBalancer(params);
+    Assert.assertTrue(stats.getSegmentStat(Stats.Segments.MOVED, "normal", 
"datasource1") > 0);
+  }
+
+  @Test
+  public void testMaxSegmentsToMoveIsHonored()
+  {
+    // Move from non-decomissioning servers
+    final ServerHolder holder1 = createHolder(server1, segment1, segment2);
+    final ServerHolder holder2 = createHolder(server2, segment3, segment4);
+    final ServerHolder holder3 = createHolder(server3);
+
+    BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
+    expectFindDestinationAndReturn(strategy, holder3);
+    EasyMock.replay(strategy);
+
+    DruidCoordinatorRuntimeParams params =
+        defaultRuntimeParamsBuilder(holder1, holder2, holder3)
+            .withDynamicConfigs(
+                CoordinatorDynamicConfig.builder()
+                                        .withSmartSegmentLoading(false)
+                                        .withMaxSegmentsToMove(1)
+                                        .withUseBatchedSegmentSampler(true)
+                                        
.withPercentOfSegmentsToConsiderPerMove(40)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [Builder.withPercentOfSegmentsToConsiderPerMove](1) should be 
avoided because it has been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5113)



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -459,145 +500,127 @@
   }
 
   @Override
-  public int getNumberOfSegmentsInQueue()
+  public Set<DataSegment> getSegmentsMarkedToDrop()
   {
-    return segmentsToLoad.size();
+    return Collections.unmodifiableSet(segmentsMarkedToDrop);
   }
 
-  @Override
-  public Set<DataSegment> getSegmentsMarkedToDrop()
+  /**
+   * A request is considered to have timed out if the time elapsed since it was
+   * first sent to the server is greater than the configured load timeout.
+   *
+   * @see DruidCoordinatorConfig#getLoadTimeoutDelay()
+   */
+  private boolean hasRequestTimedOut(SegmentHolder holder, long 
currentTimeMillis)
   {
-    return Collections.unmodifiableSet(segmentsMarkedToDrop);
+    return holder.isRequestSentToServer()
+           && currentTimeMillis - holder.getFirstRequestMillis()
+              > config.getLoadTimeoutDelay().getMillis();
   }
 
-  private abstract class SegmentHolder
+  private void onRequestFailed(SegmentHolder holder, String failureCause)
   {
-    private final DataSegment segment;
-    private final DataSegmentChangeRequest changeRequest;
-    private final List<LoadPeonCallback> callbacks = new ArrayList<>();
-
-    // Time when this request was sent to target server the first time.
-    private volatile long scheduleTime = -1;
-
-    private SegmentHolder(
-        DataSegment segment,
-        DataSegmentChangeRequest changeRequest,
-        LoadPeonCallback callback
-    )
-    {
-      this.segment = segment;
-      this.changeRequest = changeRequest;
+    log.error(
+        "Server[%s] failed segment[%s] request[%s] with cause [%s].",
+        serverId, holder.getSegment().getId(), holder.getAction(), failureCause
+    );
+    onRequestCompleted(holder, SegmentStatusInQueue.FAILED);
+  }
 
-      if (callback != null) {
-        this.callbacks.add(callback);
-      }
-    }
+  private void onRequestCompleted(SegmentHolder holder, SegmentStatusInQueue 
status)
+  {
+    final SegmentAction action = holder.getAction();
+    log.trace(
+        "Server[%s] completed request[%s] on segment[%s] with status[%s].",
+        serverId, action, holder.getSegment().getId(), status
+    );
 
-    public void addCallback(LoadPeonCallback newCallback)
-    {
-      synchronized (callbacks) {
-        if (newCallback != null) {
-          callbacks.add(newCallback);
-        }
-      }
+    if (holder.isLoad()) {
+      queuedSize.addAndGet(-holder.getSegment().getSize());
     }
+    incrementStat(holder, status);
+    executeCallbacks(holder, status == SegmentStatusInQueue.SUCCESS);
+  }
 
-    public DataSegment getSegment()
-    {
-      return segment;
+  private void incrementStat(SegmentHolder holder, SegmentStatusInQueue status)
+  {
+    stats.add(status.getStatForAction(holder.getAction()), 1);
+    if (status.datasourceStat != null) {
+      stats.addToDatasourceStat(status.datasourceStat, 
holder.getSegment().getDataSource(), 1);
     }
+  }
 
-    public DataSegmentChangeRequest getChangeRequest()
-    {
-      return changeRequest;
-    }
+  private void executeCallbacks(SegmentHolder holder, boolean success)
+  {
+    callBackExecutor.execute(() -> {
+      for (LoadPeonCallback callback : holder.getCallbacks()) {
+        callback.execute(success);
+      }
+    });
+  }
 
-    public boolean hasTimedOut()
-    {
-      if (scheduleTime < 0) {
-        scheduleTime = System.currentTimeMillis();
-        return false;
-      } else if (System.currentTimeMillis() - scheduleTime > 
config.getLoadTimeoutDelay().getMillis()) {
-        return true;
-      } else {
+  /**
+   * Tries to cancel a load/drop operation. An load/drop request can be 
cancelled
+   * only if it has not already been sent to the corresponding server.
+   */
+  @Override
+  public boolean cancelOperation(DataSegment segment)
+  {
+    synchronized (lock) {
+      if (activeRequestSegments.contains(segment)) {
         return false;
       }
-    }
-
-    public void requestSucceeded()
-    {
-      log.trace(
-          "Server[%s] Successfully processed segment[%s] request[%s].",
-          serverId,
-          segment.getId(),
-          changeRequest.getClass().getSimpleName()
-      );
-
-      callBackExecutor.execute(() -> {
-        for (LoadPeonCallback callback : callbacks) {
-          if (callback != null) {
-            callback.execute(true);
-          }
-        }
-      });
-    }
 
-    public void requestFailed(String failureCause)
-    {
-      log.error(
-          "Server[%s] Failed segment[%s] request[%s] with cause [%s].",
-          serverId,
-          segment.getId(),
-          changeRequest.getClass().getSimpleName(),
-          failureCause
-      );
-
-      failedAssignCount.getAndIncrement();
+      // Find the action on this segment, if any
+      final SegmentHolder holder = segmentsToLoad.containsKey(segment)
+                                   ? segmentsToLoad.remove(segment)
+                                   : segmentsToDrop.remove(segment);
+      if (holder == null) {
+        return false;
+      }
 
-      callBackExecutor.execute(() -> {
-        for (LoadPeonCallback callback : callbacks) {
-          if (callback != null) {
-            callback.execute(false);
-          }
-        }
-      });
-    }
-
-    @Override
-    public String toString()
-    {
-      return changeRequest.toString();
+      queuedSegments.remove(holder);
+      onRequestCompleted(holder, SegmentStatusInQueue.CANCELLED);
+      return true;
     }
   }
 
-  private class LoadSegmentHolder extends SegmentHolder
+  private enum SegmentStatusInQueue
   {
-    public LoadSegmentHolder(DataSegment segment, LoadPeonCallback callback)
-    {
-      super(segment, new SegmentChangeRequestLoad(segment), callback);
-      queuedSize.addAndGet(segment.getSize());
-    }
+    ASSIGNED(Stats.SegmentQueue.ASSIGNED_ACTIONS),
+    SUCCESS(Stats.SegmentQueue.COMPLETED_ACTIONS),
+    FAILED(Stats.SegmentQueue.FAILED_ACTIONS),
+    CANCELLED(Stats.SegmentQueue.CANCELLED_ACTIONS);
 
-    @Override
-    public void requestSucceeded()
-    {
-      queuedSize.addAndGet(-getSegment().getSize());
-      super.requestSucceeded();
-    }
+    final CoordinatorStat loadStat;
+    final CoordinatorStat moveStat;
+    final CoordinatorStat dropStat;
+    final CoordinatorStat datasourceStat;
 
-    @Override
-    public void requestFailed(String failureCause)
+    SegmentStatusInQueue(CoordinatorStat datasourceStat)
     {
-      queuedSize.addAndGet(-getSegment().getSize());
-      super.requestFailed(failureCause);
+      // These stats are not emitted and are tracked for debugging purposes 
only
+      final String prefix = StringUtils.toLowerCase(name());
+      this.loadStat = new CoordinatorStat(prefix + "Load");
+      this.moveStat = new CoordinatorStat(prefix + "Move");
+      this.dropStat = new CoordinatorStat(prefix + "Drop");
+
+      this.datasourceStat = datasourceStat;
     }
-  }
 
-  private class DropSegmentHolder extends SegmentHolder
-  {
-    public DropSegmentHolder(DataSegment segment, LoadPeonCallback callback)
+    CoordinatorStat getStatForAction(SegmentAction action)
     {
-      super(segment, new SegmentChangeRequestDrop(segment), callback);
+      switch (action) {

Review Comment:
   ## Missing enum case in switch
   
   Switch statement does not have a case for [MOVE_FROM](1).
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5117)



##########
server/src/test/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.server.ServerTestHelper;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import 
org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
+import org.apache.druid.timeline.DataSegment;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ *
+ */
+public class HttpLoadQueuePeonTest
+{
+  private final List<DataSegment> segments =
+      CreateDataSegments.ofDatasource("test")
+                        .forIntervals(1, Granularities.DAY)
+                        .startingAt("2022-01-01")
+                        .withNumPartitions(4)
+                        .eachOfSizeInMb(100);
+
+  private TestHttpClient httpClient;
+  private HttpLoadQueuePeon httpLoadQueuePeon;
+  private BlockingExecutorService processingExecutor;
+  private BlockingExecutorService callbackExecutor;
+
+  private final List<DataSegment> processedSegments = new ArrayList<>();
+
+  @Before
+  public void setUp()
+  {
+    httpClient = new TestHttpClient();
+    processingExecutor = new 
BlockingExecutorService("HttpLoadQueuePeonTest-%s");
+    callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb");
+    processedSegments.clear();
+
+    httpLoadQueuePeon = new HttpLoadQueuePeon(
+        "http://dummy:4000";,
+        ServerTestHelper.MAPPER,
+        httpClient,
+        new TestDruidCoordinatorConfig.Builder()
+            .withHttpLoadQueuePeonBatchSize(10)
+            .build(),
+        new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", 
processingExecutor, true),
+        callbackExecutor
+    );
+    httpLoadQueuePeon.start();
+  }
+
+  @After
+  public void tearDown()
+  {
+    httpLoadQueuePeon.stop();
+  }
+
+  @Test
+  public void testSimple()
+  {
+    httpLoadQueuePeon
+        .dropSegment(segments.get(0), markSegmentProcessed(segments.get(0)));
+    httpLoadQueuePeon
+        .loadSegment(segments.get(1), SegmentAction.LOAD, 
markSegmentProcessed(segments.get(1)));
+    httpLoadQueuePeon
+        .loadSegment(segments.get(2), SegmentAction.REPLICATE, 
markSegmentProcessed(segments.get(2)));
+    httpLoadQueuePeon
+        .loadSegment(segments.get(3), SegmentAction.MOVE_TO, 
markSegmentProcessed(segments.get(3)));
+
+    // Send requests to server
+    processingExecutor.finishAllPendingTasks();
+    Assert.assertEquals(segments, httpClient.segmentsSentToServer);
+
+    // Verify that all callbacks are executed
+    callbackExecutor.finishAllPendingTasks();
+    Assert.assertEquals(segments, processedSegments);
+  }
+
+  @Test
+  public void testLoadDropAfterStop()
+  {
+    // Verify that requests sent after stopping the peon fail immediately
+    httpLoadQueuePeon.stop();
+
+    final Set<DataSegment> failedSegments = new HashSet<>();
+    final DataSegment segment1 = segments.get(0);
+    httpLoadQueuePeon.dropSegment(segment1, success -> {
+      if (!success) {
+        failedSegments.add(segment1);
+      }
+    });
+    final DataSegment segment2 = segments.get(1);
+    httpLoadQueuePeon.loadSegment(segment2, SegmentAction.MOVE_TO, success -> {
+      if (!success) {
+        failedSegments.add(segment2);
+      }
+    });
+
+    Assert.assertTrue(failedSegments.contains(segment1));
+    Assert.assertTrue(failedSegments.contains(segment2));
+  }
+
+  @Test
+  public void testPriorityOfSegmentAction()
+  {
+    // Shuffle the segments for the same day
+    final List<DataSegment> segmentsDay1 = new ArrayList<>(segments);
+    Collections.shuffle(segmentsDay1);
+
+    // Assign segments to the actions in their order of priority
+    // Order: drop, load, replicate, move
+    final List<QueueAction> actions = Arrays.asList(
+        QueueAction.of(segmentsDay1.get(0), s -> 
httpLoadQueuePeon.dropSegment(s, null)),
+        QueueAction.of(segmentsDay1.get(1), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)),
+        QueueAction.of(segmentsDay1.get(2), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)),
+        QueueAction.of(segmentsDay1.get(3), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null))
+    );
+
+    // Queue the actions on the peon in a random order
+    Collections.shuffle(actions);
+    actions.forEach(QueueAction::invoke);
+
+    // Send one batch of requests to the server
+    processingExecutor.finishAllPendingTasks();
+
+    // Verify that all segments are sent to the server in the expected order
+    Assert.assertEquals(segmentsDay1, httpClient.segmentsSentToServer);
+  }
+
+  @Test
+  public void testPriorityOfSegmentInterval()
+  {
+    // Create 8 segments (4 x 2days) and shuffle them
+    final List<DataSegment> segmentsDay1 = new ArrayList<>(segments);
+    Collections.shuffle(segmentsDay1);
+
+    final List<DataSegment> segmentsDay2 = new ArrayList<>(
+        CreateDataSegments.ofDatasource("test")
+                          .forIntervals(1, Granularities.DAY)
+                          .startingAt("2022-01-02")
+                          .withNumPartitions(4)
+                          .eachOfSizeInMb(100)
+    );
+    Collections.shuffle(segmentsDay2);
+
+    // Assign segments to the actions in their order of priority
+    // Priority order: action (drop, priorityLoad, etc), then interval (new 
then old)
+    List<QueueAction> actions = Arrays.asList(
+        QueueAction.of(segmentsDay2.get(0), s -> 
httpLoadQueuePeon.dropSegment(s, null)),
+        QueueAction.of(segmentsDay1.get(0), s -> 
httpLoadQueuePeon.dropSegment(s, null)),
+        QueueAction.of(segmentsDay2.get(1), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)),
+        QueueAction.of(segmentsDay1.get(1), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.LOAD, null)),
+        QueueAction.of(segmentsDay2.get(2), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)),
+        QueueAction.of(segmentsDay1.get(2), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.REPLICATE, null)),
+        QueueAction.of(segmentsDay2.get(3), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null)),
+        QueueAction.of(segmentsDay1.get(3), s -> 
httpLoadQueuePeon.loadSegment(s, SegmentAction.MOVE_TO, null))
+    );
+    final List<DataSegment> expectedSegmentOrder =
+        actions.stream().map(a -> a.segment).collect(Collectors.toList());
+
+    // Queue the actions on the peon in a random order
+    Collections.shuffle(actions);
+    actions.forEach(QueueAction::invoke);
+
+    // Send one batch of requests to the server
+    processingExecutor.finishNextPendingTask();
+
+    // Verify that all segments are sent to the server in the expected order
+    Assert.assertEquals(expectedSegmentOrder, httpClient.segmentsSentToServer);
+  }
+
+  @Test
+  public void testCancelLoad()
+  {
+    final DataSegment segment = segments.get(0);
+    httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, 
markSegmentProcessed(segment));
+    Assert.assertEquals(1, httpLoadQueuePeon.getSegmentsToLoad().size());
+
+    boolean cancelled = httpLoadQueuePeon.cancelOperation(segment);
+    Assert.assertTrue(cancelled);
+    Assert.assertEquals(0, httpLoadQueuePeon.getSegmentsToLoad().size());
+
+    Assert.assertTrue(processedSegments.isEmpty());
+  }
+
+  @Test
+  public void testCancelDrop()
+  {
+    final DataSegment segment = segments.get(0);
+    httpLoadQueuePeon.dropSegment(segment, markSegmentProcessed(segment));
+    Assert.assertEquals(1, httpLoadQueuePeon.getSegmentsToDrop().size());
+
+    boolean cancelled = httpLoadQueuePeon.cancelOperation(segment);
+    Assert.assertTrue(cancelled);
+    Assert.assertTrue(httpLoadQueuePeon.getSegmentsToDrop().isEmpty());
+
+    Assert.assertTrue(processedSegments.isEmpty());
+  }
+
+  @Test
+  public void testCannotCancelRequestSentToServer()
+  {
+    final DataSegment segment = segments.get(0);
+    httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, 
markSegmentProcessed(segment));
+    Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
+
+    // Send the request to the server
+    processingExecutor.finishNextPendingTask();
+    Assert.assertTrue(httpClient.segmentsSentToServer.contains(segment));
+
+    // Segment is still in queue but operation cannot be cancelled
+    Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
+    boolean cancelled = httpLoadQueuePeon.cancelOperation(segment);
+    Assert.assertFalse(cancelled);
+
+    // Handle response from server
+    processingExecutor.finishNextPendingTask();
+
+    // Segment has been removed from queue
+    Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().isEmpty());
+    cancelled = httpLoadQueuePeon.cancelOperation(segment);
+    Assert.assertFalse(cancelled);
+
+    // Execute callbacks and verify segment is fully processed
+    callbackExecutor.finishAllPendingTasks();
+    Assert.assertTrue(processedSegments.contains(segment));
+  }
+
+  @Test
+  public void testCannotCancelOperationMultipleTimes()
+  {
+    final DataSegment segment = segments.get(0);
+    httpLoadQueuePeon.loadSegment(segment, SegmentAction.REPLICATE, 
markSegmentProcessed(segment));
+    Assert.assertTrue(httpLoadQueuePeon.getSegmentsToLoad().contains(segment));
+
+    Assert.assertTrue(httpLoadQueuePeon.cancelOperation(segment));
+    Assert.assertFalse(httpLoadQueuePeon.cancelOperation(segment));
+  }
+
+  private LoadPeonCallback markSegmentProcessed(DataSegment segment)
+  {
+    return success -> processedSegments.add(segment);
+  }
+
+  private static class TestHttpClient implements HttpClient, 
DataSegmentChangeHandler
+  {
+    private final List<DataSegment> segmentsSentToServer = new ArrayList<>();
+
+    @Override
+    public <Intermediate, Final> ListenableFuture<Final> go(
+        Request request,
+        HttpResponseHandler<Intermediate, Final> httpResponseHandler
+    )
+    {
+      throw new UnsupportedOperationException("Not Implemented.");
+    }
+
+    @Override
+    public <Intermediate, Final> ListenableFuture<Final> go(
+        Request request,
+        HttpResponseHandler<Intermediate, Final> httpResponseHandler,
+        Duration duration
+    )
+    {
+      HttpResponse httpResponse = new 
DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      httpResponse.setContent(ChannelBuffers.buffer(0));
+      httpResponseHandler.handleResponse(httpResponse, null);
+      try {
+        List<DataSegmentChangeRequest> changeRequests = 
ServerTestHelper.MAPPER.readValue(
+            request.getContent().array(), new 
TypeReference<List<DataSegmentChangeRequest>>()
+            {
+            }
+        );
+
+        List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> 
statuses = new ArrayList<>(changeRequests.size());
+        for (DataSegmentChangeRequest cr : changeRequests) {
+          cr.go(this, null);
+          statuses.add(new 
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
+              cr,
+              SegmentLoadDropHandler.Status.SUCCESS
+          ));
+        }
+        return (ListenableFuture) Futures.immediateFuture(
+            new ByteArrayInputStream(
+                ServerTestHelper.MAPPER
+                    .writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF)

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [ObjectMapper.writerWithType](1) should be avoided because it has 
been deprecated.
   
   [Show more 
details](https://github.com/apache/druid/security/code-scanning/5116)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to