kfaraz commented on code in PR #14584:
URL: https://github.com/apache/druid/pull/14584#discussion_r1295360138
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -43,27 +47,79 @@ public DruidCoordinatorRuntimeParams
run(DruidCoordinatorRuntimeParams params)
final DruidCluster cluster = params.getDruidCluster();
final SegmentLoadingConfig loadingConfig =
params.getSegmentLoadingConfig();
- final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
+
+ final int maxSegmentsToMove = getMaxSegmentsToMove(params);
if (maxSegmentsToMove <= 0) {
log.info("Skipping balance as maxSegmentsToMove is [%d].",
maxSegmentsToMove);
return params;
} else {
log.info(
- "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d],
maxLifetime=[%d].",
+ "Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and
maxLifetime[%d].",
cluster.getTierNames(), maxSegmentsToMove,
loadingConfig.getMaxLifetimeInLoadQueue()
);
}
cluster.getHistoricals().forEach(
- (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
+ (tier, servers) -> new TierSegmentBalancer(tier, servers,
maxSegmentsToMove, params).run()
);
CoordinatorRunStats runStats = params.getCoordinatorStats();
params.getBalancerStrategy()
- .getAndResetStats()
+ .getStats()
.forEachStat(runStats::add);
return params;
}
+ /**
+ * Recomputes the value of {@code maxSegmentsToMove} if smart segment loading
+ * is enabled. {@code maxSegmentsToMove} defines only the upper bound, the
actual
+ * number of segments picked for moving is determined by the {@link
TierSegmentBalancer}
+ * based on the level of skew in the tier.
+ */
+ private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params)
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ if (dynamicConfig.isSmartSegmentLoading()) {
+ final int totalSegmentsInCluster =
getTotalSegmentsOnHistoricals(params.getDruidCluster());
+ final int numHistoricals = getNumHistoricals(params.getDruidCluster());
+ final int numBalancerThreads =
params.getSegmentLoadingConfig().getBalancerComputeThreads();
+ final int maxSegmentsToMove =
SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
+ totalSegmentsInCluster,
+ numBalancerThreads
+ );
+ log.info(
+ "Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d]
historicals.",
+ maxSegmentsToMove, totalSegmentsInCluster, numHistoricals
+ );
+
+ return maxSegmentsToMove;
+ } else {
+ return dynamicConfig.getMaxSegmentsToMove();
+ }
+ }
+
+ /**
+ * Total number of all segments in the cluster that would participate in cost
+ * computations. This includes all replicas of all loaded, loading, dropping
+ * and moving segments across all historicals (active and decommissioning).
+ * <p>
+ * This is calculated here to ensure that all assignments done by the
preceding
+ * {@link RunRules} duty are accounted for.
+ */
+ private int getTotalSegmentsOnHistoricals(DruidCluster cluster)
+ {
+ return cluster.getHistoricals().values().stream()
+ .flatMap(Collection::stream)
+ .mapToInt(server -> server.getServer().getNumSegments() +
server.getNumQueuedSegments())
+ .sum();
+ }
+
+ private int getNumHistoricals(DruidCluster cluster)
+ {
+ return cluster.getHistoricals().values().stream()
+ .mapToInt(Collection::size)
+ .sum();
+ }
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]