kfaraz commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1231753246


##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -888,26 +799,101 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
       List<ImmutableDruidServer> currentServers = prepareCurrentServers();
 
       startPeonsForNewServers(currentServers);
+      stopPeonsForDisappearedServers(currentServers);
 
-      cluster = prepareCluster(params, currentServers);
-      segmentReplicantLookup = SegmentReplicantLookup.make(cluster, 
getDynamicConfigs().getReplicateAfterLoadTimeout());
+      final DruidCluster cluster = 
prepareCluster(params.getCoordinatorDynamicConfig(), currentServers);
+      cancelLoadsOnDecommissioningServers(cluster);
 
-      stopPeonsForDisappearedServers(currentServers);
+      final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
 
-      final RoundRobinServerSelector roundRobinServerSelector;
-      if 
(params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
-        roundRobinServerSelector = new RoundRobinServerSelector(cluster);
-        log.info("Using round-robin segment assignment.");
-      } else {
-        roundRobinServerSelector = null;
+      initBalancerExecutor();
+      final BalancerStrategy balancerStrategy = 
balancerStrategyFactory.createBalancerStrategy(balancerExec);
+      log.info(
+          "Using balancer strategy [%s] with round-robin assignment [%s] and 
debug dimensions [%s].",
+          balancerStrategy.getClass().getSimpleName(),
+          dynamicConfig.isUseRoundRobinSegmentAssignment(), 
dynamicConfig.getDebugDimensions()
+      );
+
+      params = params.buildFromExisting()
+                     .withDruidCluster(cluster)
+                     .withDynamicConfigs(recomputeDynamicConfig(params))
+                     .withBalancerStrategy(balancerStrategy)
+                     .withSegmentAssignerUsing(loadQueueManager)
+                     .build();
+
+      segmentReplicantLookup = params.getSegmentReplicantLookup();
+
+      return params;
+    }
+
+    /**
+     * Recomputes dynamic config values if {@code smartLoadQueue} is enabled.
+     */
+    private CoordinatorDynamicConfig 
recomputeDynamicConfig(DruidCoordinatorRuntimeParams params)
+    {
+      final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+      if (!dynamicConfig.isSmartSegmentLoading()) {
+        return dynamicConfig;
       }
 
-      return params.buildFromExisting()
-                   .withDruidCluster(cluster)
-                   .withLoadManagementPeons(loadManagementPeons)
-                   .withSegmentReplicantLookup(segmentReplicantLookup)
-                   .withRoundRobinServerSelector(roundRobinServerSelector)
-                   .build();
+      // Impose a lower bound on both replicationThrottleLimit and 
maxSegmentsToMove
+      final int throttlePercentage = 2;
+      final int replicationThrottleLimit = Math.max(
+          100,
+          params.getUsedSegments().size() * throttlePercentage / 100
+      );
+
+      // Impose an upper bound on maxSegmentsToMove to ensure that coordinator
+      // run times are bounded. This limit can be relaxed as performance of
+      // the CostBalancerStrategy.computeCost() is improved.
+      final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
+
+      log.info(
+          "Smart segment loading is enabled. Recomputed 
replicationThrottleLimit"
+          + " [%d] (%d%% of used segments) and maxSegmentsToMove [%d].",
+          replicationThrottleLimit, throttlePercentage, maxSegmentsToMove
+      );
+
+      return CoordinatorDynamicConfig.builder()
+                                     .withMaxSegmentsInNodeLoadingQueue(0)
+                                     
.withReplicationThrottleLimit(replicationThrottleLimit)
+                                     .withMaxSegmentsToMove(maxSegmentsToMove)
+                                     .withUseRoundRobinSegmentAssignment(true)
+                                     .withUseBatchedSegmentSampler(true)
+                                     .withEmitBalancingStats(false)
+                                     .build(dynamicConfig);
+    }
+
+    /**
+     * Cancels all load/move operations on decommissioning servers. This should
+     * be done before initializing the SegmentReplicantLookup so that
+     * under-replicated segments can be assigned in the current run itself.
+     */
+    private void cancelLoadsOnDecommissioningServers(DruidCluster cluster)
+    {
+      final AtomicInteger cancelledCount = new AtomicInteger(0);
+      final List<ServerHolder> decommissioningServers
+          = cluster.getAllServers().stream()
+                   .filter(ServerHolder::isDecommissioning)
+                   .collect(Collectors.toList());
+
+      for (ServerHolder server : decommissioningServers) {
+        server.getQueuedSegments().forEach(
+            (segment, action) -> {
+              // Cancel the operation if it is a type of load
+              if (action.isLoad() && server.cancelOperation(action, segment)) {
+                cancelledCount.incrementAndGet();
+              }
+            }
+        );
+      }
+
+      if (cancelledCount.get() > 0) {
+        log.info(
+            "Cancelled [%d] load/move operations on [%d] decommissioning 
servers.",
+            cancelledCount.get(), decommissioningServers.size()
+        );
+      }
     }

Review Comment:
   We are doing this cancellation in this duty (as opposed to `RunRules` or 
`BalanceSegments`) since the `SegmentReplicantLookup` is constructed right 
after this method and thus the Coordinator knows in this run itself that some 
segments are under-replicated and it needs to queue up some loads on active 
servers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to