kfaraz commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1231750050


##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -888,26 +799,101 @@ public DruidCoordinatorRuntimeParams 
run(DruidCoordinatorRuntimeParams params)
       List<ImmutableDruidServer> currentServers = prepareCurrentServers();
 
       startPeonsForNewServers(currentServers);
+      stopPeonsForDisappearedServers(currentServers);
 
-      cluster = prepareCluster(params, currentServers);
-      segmentReplicantLookup = SegmentReplicantLookup.make(cluster, 
getDynamicConfigs().getReplicateAfterLoadTimeout());
+      final DruidCluster cluster = 
prepareCluster(params.getCoordinatorDynamicConfig(), currentServers);
+      cancelLoadsOnDecommissioningServers(cluster);
 
-      stopPeonsForDisappearedServers(currentServers);
+      final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
 
-      final RoundRobinServerSelector roundRobinServerSelector;
-      if 
(params.getCoordinatorDynamicConfig().isUseRoundRobinSegmentAssignment()) {
-        roundRobinServerSelector = new RoundRobinServerSelector(cluster);
-        log.info("Using round-robin segment assignment.");
-      } else {
-        roundRobinServerSelector = null;
+      initBalancerExecutor();
+      final BalancerStrategy balancerStrategy = 
balancerStrategyFactory.createBalancerStrategy(balancerExec);
+      log.info(
+          "Using balancer strategy [%s] with round-robin assignment [%s] and 
debug dimensions [%s].",
+          balancerStrategy.getClass().getSimpleName(),
+          dynamicConfig.isUseRoundRobinSegmentAssignment(), 
dynamicConfig.getDebugDimensions()
+      );
+
+      params = params.buildFromExisting()
+                     .withDruidCluster(cluster)
+                     .withDynamicConfigs(recomputeDynamicConfig(params))
+                     .withBalancerStrategy(balancerStrategy)
+                     .withSegmentAssignerUsing(loadQueueManager)
+                     .build();
+
+      segmentReplicantLookup = params.getSegmentReplicantLookup();
+
+      return params;
+    }
+
+    /**
+     * Recomputes dynamic config values if {@code smartLoadQueue} is enabled.
+     */
+    private CoordinatorDynamicConfig 
recomputeDynamicConfig(DruidCoordinatorRuntimeParams params)
+    {
+      final CoordinatorDynamicConfig dynamicConfig = 
params.getCoordinatorDynamicConfig();
+      if (!dynamicConfig.isSmartSegmentLoading()) {
+        return dynamicConfig;
       }
 
-      return params.buildFromExisting()
-                   .withDruidCluster(cluster)
-                   .withLoadManagementPeons(loadManagementPeons)
-                   .withSegmentReplicantLookup(segmentReplicantLookup)
-                   .withRoundRobinServerSelector(roundRobinServerSelector)
-                   .build();
+      // Impose a lower bound on both replicationThrottleLimit and 
maxSegmentsToMove
+      final int throttlePercentage = 2;
+      final int replicationThrottleLimit = Math.max(
+          100,
+          params.getUsedSegments().size() * throttlePercentage / 100
+      );
+
+      // Impose an upper bound on maxSegmentsToMove to ensure that coordinator
+      // run times are bounded. This limit can be relaxed as performance of
+      // the CostBalancerStrategy.computeCost() is improved.
+      final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
+
+      log.info(
+          "Smart segment loading is enabled. Recomputed 
replicationThrottleLimit"
+          + " [%d] (%d%% of used segments) and maxSegmentsToMove [%d].",
+          replicationThrottleLimit, throttlePercentage, maxSegmentsToMove
+      );
+
+      return CoordinatorDynamicConfig.builder()
+                                     .withMaxSegmentsInNodeLoadingQueue(0)
+                                     
.withReplicationThrottleLimit(replicationThrottleLimit)
+                                     .withMaxSegmentsToMove(maxSegmentsToMove)
+                                     .withUseRoundRobinSegmentAssignment(true)
+                                     .withUseBatchedSegmentSampler(true)
+                                     .withEmitBalancingStats(false)

Review Comment:
   Yeah, I was not too keen on this myself. I considered putting this logic 
inside `DruidCoordinatorRuntimeParams` itself but I guess the best thing to do 
is just have a separate config object, as you suggest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to