clintropolis commented on code in PR #14584:
URL: https://github.com/apache/druid/pull/14584#discussion_r1294005717
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -43,27 +47,79 @@ public DruidCoordinatorRuntimeParams
run(DruidCoordinatorRuntimeParams params)
final DruidCluster cluster = params.getDruidCluster();
final SegmentLoadingConfig loadingConfig =
params.getSegmentLoadingConfig();
- final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
+
+ final int maxSegmentsToMove = getMaxSegmentsToMove(params);
if (maxSegmentsToMove <= 0) {
log.info("Skipping balance as maxSegmentsToMove is [%d].",
maxSegmentsToMove);
return params;
} else {
log.info(
- "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d],
maxLifetime=[%d].",
+ "Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and
maxLifetime[%d].",
cluster.getTierNames(), maxSegmentsToMove,
loadingConfig.getMaxLifetimeInLoadQueue()
);
}
cluster.getHistoricals().forEach(
- (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
+ (tier, servers) -> new TierSegmentBalancer(tier, servers,
maxSegmentsToMove, params).run()
);
CoordinatorRunStats runStats = params.getCoordinatorStats();
params.getBalancerStrategy()
- .getAndResetStats()
+ .getStats()
.forEachStat(runStats::add);
return params;
}
+ /**
+ * Recomputes the value of {@code maxSegmentsToMove} if smart segment loading
+ * is enabled. {@code maxSegmentsToMove} defines only the upper bound, the
actual
+ * number of segments picked for moving is determined by the {@link
TierSegmentBalancer}
+ * based on the level of skew in the tier.
+ */
+ private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params)
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ if (dynamicConfig.isSmartSegmentLoading()) {
+ final int totalSegmentsInCluster =
getTotalSegmentsOnHistoricals(params.getDruidCluster());
+ final int numHistoricals = getNumHistoricals(params.getDruidCluster());
+ final int numBalancerThreads =
params.getSegmentLoadingConfig().getBalancerComputeThreads();
+ final int maxSegmentsToMove =
SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
+ totalSegmentsInCluster,
+ numBalancerThreads
+ );
+ log.info(
+ "Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d]
historicals.",
+ maxSegmentsToMove, totalSegmentsInCluster, numHistoricals
+ );
+
+ return maxSegmentsToMove;
+ } else {
+ return dynamicConfig.getMaxSegmentsToMove();
+ }
+ }
+
+ /**
+ * Total number of all segments in the cluster that would participate in cost
+ * computations. This includes all replicas of all loaded, loading, dropping
+ * and moving segments across all historicals (active and decommissioning).
+ * <p>
+ * This is calculated here to ensure that all assignments done by the
preceding
+ * {@link RunRules} duty are accounted for.
+ */
+ private int getTotalSegmentsOnHistoricals(DruidCluster cluster)
+ {
+ return cluster.getHistoricals().values().stream()
+ .flatMap(Collection::stream)
+ .mapToInt(server -> server.getServer().getNumSegments() +
server.getNumQueuedSegments())
+ .sum();
+ }
+
+ private int getNumHistoricals(DruidCluster cluster)
+ {
+ return cluster.getHistoricals().values().stream()
+ .mapToInt(Collection::size)
+ .sum();
+ }
Review Comment:
nit, it might be cheaper for very large clusters to compute these two counts
together
##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+ /**
+ * At least this number of segments must be picked for moving in every cycle
+ * to keep the cluster well balanced.
+ */
+ private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+ private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+ /**
+ * Calculates the number of segments to be picked for moving in the given
tier,
+ * based on the level of skew between the historicals in the tier.
+ *
+ * @param tier Name of tier used for logging purposes
+ * @param historicals Active historicals in tier
+ * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be
moved
+ * in the tier.
+ * @return Number of segments to move in the tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+ */
+ public static int computeNumSegmentsToMoveInTier(
+ String tier,
+ List<ServerHolder> historicals,
+ int maxSegmentsToMoveInTier
+ )
+ {
+ final int totalSegments = historicals.stream().mapToInt(
+ server -> server.getProjectedSegments().getTotalSegmentCount()
+ ).sum();
+
+ // Move at least some segments to ensure that the cluster is always
balancing itself
+ final int minSegmentsToMove = SegmentToMoveCalculator
+ .computeMinSegmentsToMoveInTier(totalSegments);
+ final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+ log.info(
+ "Need to move [%,d] segments in tier[%s] to attain balance. Allowed
values are [min=%d, max=%d].",
+ segmentsToMoveToFixDeviation, tier, minSegmentsToMove,
maxSegmentsToMoveInTier
+ );
+
+ final int activeSegmentsToMove = Math.max(minSegmentsToMove,
segmentsToMoveToFixDeviation);
+ return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+ }
+
+ /**
+ * Calculates the minimum number of segments that should be considered for
+ * moving in a tier, so that the cluster is always balancing itself.
+ * <p>
+ * This value must be calculated separately for every tier.
+ *
+ * @param totalSegmentsInTier Total number of all replicas of all segments
+ * loaded or queued across all historicals in the
tier.
+ * @return {@code minSegmentsToMoveInTier} in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.6% of totalSegmentsInTier}].
+ */
+ public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+ {
+ // Divide by 2^14 and multiply by 100 so that the value increases
+ // in steps of 100 for every 2^14 = ~16k segments
+ int upperBound = (totalSegmentsInTier >> 14) * 100;
+ int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+ return Math.max(lowerBound, upperBound);
+ }
+
+ /**
+ * Calculates the maximum number of segments that can be picked for moving in
+ * the cluster in a single coordinator run, assuming that the run must finish
+ * within 40s. A typical coordinator run period is 1 minute and there should
+ * be a buffer of 20s for other coordinator duties.
Review Comment:
should this method accept the coordinator run period so it can adjust
accordingly instead of just assuming the default is used?
##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+ /**
+ * At least this number of segments must be picked for moving in every cycle
+ * to keep the cluster well balanced.
+ */
+ private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+ private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+ /**
+ * Calculates the number of segments to be picked for moving in the given
tier,
+ * based on the level of skew between the historicals in the tier.
+ *
+ * @param tier Name of tier used for logging purposes
+ * @param historicals Active historicals in tier
+ * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be
moved
+ * in the tier.
+ * @return Number of segments to move in the tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+ */
+ public static int computeNumSegmentsToMoveInTier(
+ String tier,
+ List<ServerHolder> historicals,
+ int maxSegmentsToMoveInTier
+ )
+ {
+ final int totalSegments = historicals.stream().mapToInt(
+ server -> server.getProjectedSegments().getTotalSegmentCount()
+ ).sum();
+
+ // Move at least some segments to ensure that the cluster is always
balancing itself
+ final int minSegmentsToMove = SegmentToMoveCalculator
+ .computeMinSegmentsToMoveInTier(totalSegments);
+ final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+ log.info(
+ "Need to move [%,d] segments in tier[%s] to attain balance. Allowed
values are [min=%d, max=%d].",
+ segmentsToMoveToFixDeviation, tier, minSegmentsToMove,
maxSegmentsToMoveInTier
+ );
+
+ final int activeSegmentsToMove = Math.max(minSegmentsToMove,
segmentsToMoveToFixDeviation);
+ return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+ }
+
+ /**
+ * Calculates the minimum number of segments that should be considered for
+ * moving in a tier, so that the cluster is always balancing itself.
+ * <p>
+ * This value must be calculated separately for every tier.
+ *
+ * @param totalSegmentsInTier Total number of all replicas of all segments
+ * loaded or queued across all historicals in the
tier.
+ * @return {@code minSegmentsToMoveInTier} in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.6% of totalSegmentsInTier}].
+ */
+ public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+ {
+ // Divide by 2^14 and multiply by 100 so that the value increases
+ // in steps of 100 for every 2^14 = ~16k segments
+ int upperBound = (totalSegmentsInTier >> 14) * 100;
+ int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+ return Math.max(lowerBound, upperBound);
+ }
Review Comment:
i'm a little bit conflicted/worried about this for large clusters, on the
one hand it is good to consider a larger number of segments to seek to achieve
balance, but iirc data transfer costs from deep storage while super cheap are
not actually free, so moves that barely make a difference could end up causing
needless churn.
I wonder if we need some sort of "inertia" threshold where if the cost of
moving the segments isn't "better enough" we skip moving the segment. That way
we can still consider a larger number of segments, but not necessarily make
every move. I suppose something like this would be balancer specific since it
really probably only makes sense for cost balancer, but still might be worth
considering.
##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+ /**
+ * At least this number of segments must be picked for moving in every cycle
+ * to keep the cluster well balanced.
+ */
+ private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+ private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+ /**
+ * Calculates the number of segments to be picked for moving in the given
tier,
+ * based on the level of skew between the historicals in the tier.
+ *
+ * @param tier Name of tier used for logging purposes
+ * @param historicals Active historicals in tier
+ * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be
moved
+ * in the tier.
+ * @return Number of segments to move in the tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+ */
+ public static int computeNumSegmentsToMoveInTier(
+ String tier,
+ List<ServerHolder> historicals,
+ int maxSegmentsToMoveInTier
+ )
+ {
+ final int totalSegments = historicals.stream().mapToInt(
+ server -> server.getProjectedSegments().getTotalSegmentCount()
+ ).sum();
+
+ // Move at least some segments to ensure that the cluster is always
balancing itself
+ final int minSegmentsToMove = SegmentToMoveCalculator
+ .computeMinSegmentsToMoveInTier(totalSegments);
+ final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+ log.info(
+ "Need to move [%,d] segments in tier[%s] to attain balance. Allowed
values are [min=%d, max=%d].",
+ segmentsToMoveToFixDeviation, tier, minSegmentsToMove,
maxSegmentsToMoveInTier
+ );
+
+ final int activeSegmentsToMove = Math.max(minSegmentsToMove,
segmentsToMoveToFixDeviation);
+ return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+ }
+
+ /**
+ * Calculates the minimum number of segments that should be considered for
+ * moving in a tier, so that the cluster is always balancing itself.
+ * <p>
+ * This value must be calculated separately for every tier.
+ *
+ * @param totalSegmentsInTier Total number of all replicas of all segments
+ * loaded or queued across all historicals in the
tier.
+ * @return {@code minSegmentsToMoveInTier} in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.6% of totalSegmentsInTier}].
+ */
+ public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+ {
+ // Divide by 2^14 and multiply by 100 so that the value increases
+ // in steps of 100 for every 2^14 = ~16k segments
+ int upperBound = (totalSegmentsInTier >> 14) * 100;
+ int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+ return Math.max(lowerBound, upperBound);
+ }
+
+ /**
+ * Calculates the maximum number of segments that can be picked for moving in
+ * the cluster in a single coordinator run, assuming that the run must finish
+ * within 40s. A typical coordinator run period is 1 minute and there should
+ * be a buffer of 20s for other coordinator duties.
+ * <p>
+ * This value must be calculated at the cluster level and then applied
+ * to every tier so that the total computation time is estimated correctly.
+ * <p>
+ * Each balancer thread can perform 2 billion computations in 40s (see
#14584).
+ * Therefore,
+ * <pre>
+ * numComputations = maxSegmentsToMove * totalSegments
+ *
+ * maxSegmentsToMove = numComputations / totalSegments
+ * = (nThreads * 2B) / totalSegments
+ * </pre>
+ *
+ * @param totalSegments Total number of all replicas of all segments loaded
or
+ * queued across all historicals in the cluster.
+ * @return {@code maxSegmentsToMove} per tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, ~20% of totalSegments].
+ * @see <a href="https://github.com/apache/druid/pull/14584">#14584</a>
+ */
+ public static int computeMaxSegmentsToMovePerTier(
+ int totalSegments,
+ int numBalancerThreads
+ )
+ {
+ Preconditions.checkArgument(
+ numBalancerThreads > 0 && numBalancerThreads <= 100,
+ "Number of balancer threads must be in range (0, 100]."
+ );
+ if (totalSegments <= 0) {
+ return 0;
+ }
+
+ // Divide by 2^9 and multiply by 100 so that the upperBound
+ // increases in steps of 100 for every 2^9 = 512 segments (~20%)
+ final int upperBound = (totalSegments >> 9) * 100;
+ final int lowerBound = MIN_SEGMENTS_TO_MOVE;
+
+ // Each thread can do ~2B computations in one cycle = 2M * 1k = 2^21 * 1k
+ int maxComputationsInThousands = numBalancerThreads << 21;
+ int maxSegmentsToMove = (maxComputationsInThousands / totalSegments) *
1000;
+
+ if (upperBound < lowerBound) {
+ return Math.min(lowerBound, totalSegments);
+ } else {
+ return Math.min(maxSegmentsToMove, upperBound);
+ }
+ }
+
+ /**
+ * Computes the number of segments that need to be moved across the
historicals
+ * in a tier to attain balance in terms of disk usage and segment counts per
+ * data source.
+ *
+ * @param tier Name of the tier used only for logging purposes
+ * @param historicals List of historicals in the tier
+ */
+ public static int computeNumSegmentsToMoveToBalanceTier(String tier,
List<ServerHolder> historicals)
+ {
+ if (historicals.isEmpty()) {
+ return 0;
+ }
+
+ return Math.max(
+ computeSegmentsToMoveToBalanceCountsPerDatasource(tier, historicals),
+ computeSegmentsToMoveToBalanceDiskUsage(tier, historicals)
+ );
+ }
+
+ private static long getTotalUsageBytes(List<ServerHolder> servers)
+ {
+ return servers.stream()
+ .mapToLong(server ->
server.getProjectedSegments().getTotalSegmentBytes())
+ .sum();
+ }
+
+ private static int getTotalSegmentCount(List<ServerHolder> servers)
+ {
+ return servers.stream()
+ .mapToInt(server ->
server.getProjectedSegments().getTotalSegmentCount())
+ .sum();
+ }
+
+ /**
+ * Computes the number of segments to move across the servers of the tier in
+ * order to balance the segment counts of the most unbalanced datasource.
+ */
+ static int computeSegmentsToMoveToBalanceCountsPerDatasource(
+ String tier,
+ List<ServerHolder> servers
+ )
+ {
+ // Find all the datasources
+ final Set<String> datasources = servers.stream().flatMap(
+ s ->
s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream()
+ ).collect(Collectors.toSet());
+ if (datasources.isEmpty()) {
+ return 0;
+ }
+
+ // Compute the min and max number of segments for each datasource
+ final Object2IntMap<String> datasourceToMaxSegments = new
Object2IntOpenHashMap<>();
+ final Object2IntMap<String> datasourceToMinSegments = new
Object2IntOpenHashMap<>();
+ for (ServerHolder server : servers) {
+ final Object2IntMap<String> datasourceToSegmentCount
+ = server.getProjectedSegments().getDatasourceToTotalSegmentCount();
+ for (String datasource : datasources) {
+ int count = datasourceToSegmentCount.getInt(datasource);
+ datasourceToMaxSegments.mergeInt(datasource, count, Math::max);
+ datasourceToMinSegments.mergeInt(datasource, count, Math::min);
+ }
+ }
+
+ // Compute the gap between min and max for each datasource and order by
largest first
+ final TreeMap<Integer, String> countDiffToDatasource = new
TreeMap<>(Comparator.reverseOrder());
+ datasourceToMaxSegments.object2IntEntrySet().forEach(entry -> {
+ String datasource = entry.getKey();
+ int maxCount = entry.getIntValue();
+ int minCount = datasourceToMinSegments.getInt(datasource);
+ countDiffToDatasource.put(maxCount - minCount, datasource);
+ });
+
+ // Identify the most unbalanced datasource
+ final Map.Entry<Integer, String> maxCountDifference =
countDiffToDatasource.firstEntry();
+ String mostUnbalancedDatasource = maxCountDifference.getValue();
+ int minNumSegments = Integer.MAX_VALUE;
+ int maxNumSegments = 0;
+ for (ServerHolder server : servers) {
+ int countForSkewedDatasource = server.getProjectedSegments()
+ .getDatasourceToTotalSegmentCount()
+ .getInt(mostUnbalancedDatasource);
+
+ minNumSegments = Math.min(minNumSegments, countForSkewedDatasource);
+ maxNumSegments = Math.max(maxNumSegments, countForSkewedDatasource);
+ }
+
+ final int numSegmentsToMove = maxCountDifference.getKey() / 2;
+ if (numSegmentsToMove > 0) {
+ log.info(
+ "Need to move [%,d] segments of datasource[%s] in tier[%s] to fix
gap between min[%,d] and max[%,d].",
+ numSegmentsToMove, mostUnbalancedDatasource, tier, minNumSegments,
maxNumSegments
+ );
+ }
+ return numSegmentsToMove;
+ }
+
+ private static int computeSegmentsToMoveToBalanceDiskUsage(
+ String tier,
+ List<ServerHolder> servers
+ )
Review Comment:
i sort of wonder if this runs into conflicts with cost balancer which
primarily operates on interval adjacency instead of disk size for datasources
where the segment sizes are not very homogenous. I imagine it might, but also
its probably not that harmful because in the worst case it just considers more
segments to move than it needed to.
##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+ /**
+ * At least this number of segments must be picked for moving in every cycle
+ * to keep the cluster well balanced.
+ */
+ private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+ private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+ /**
+ * Calculates the number of segments to be picked for moving in the given
tier,
+ * based on the level of skew between the historicals in the tier.
+ *
+ * @param tier Name of tier used for logging purposes
+ * @param historicals Active historicals in tier
+ * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be
moved
+ * in the tier.
+ * @return Number of segments to move in the tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+ */
+ public static int computeNumSegmentsToMoveInTier(
+ String tier,
+ List<ServerHolder> historicals,
+ int maxSegmentsToMoveInTier
+ )
+ {
+ final int totalSegments = historicals.stream().mapToInt(
+ server -> server.getProjectedSegments().getTotalSegmentCount()
+ ).sum();
+
+ // Move at least some segments to ensure that the cluster is always
balancing itself
+ final int minSegmentsToMove = SegmentToMoveCalculator
+ .computeMinSegmentsToMoveInTier(totalSegments);
+ final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+ log.info(
+ "Need to move [%,d] segments in tier[%s] to attain balance. Allowed
values are [min=%d, max=%d].",
+ segmentsToMoveToFixDeviation, tier, minSegmentsToMove,
maxSegmentsToMoveInTier
+ );
+
+ final int activeSegmentsToMove = Math.max(minSegmentsToMove,
segmentsToMoveToFixDeviation);
+ return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+ }
+
+ /**
+ * Calculates the minimum number of segments that should be considered for
+ * moving in a tier, so that the cluster is always balancing itself.
+ * <p>
+ * This value must be calculated separately for every tier.
+ *
+ * @param totalSegmentsInTier Total number of all replicas of all segments
+ * loaded or queued across all historicals in the
tier.
+ * @return {@code minSegmentsToMoveInTier} in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.6% of totalSegmentsInTier}].
+ */
+ public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+ {
+ // Divide by 2^14 and multiply by 100 so that the value increases
+ // in steps of 100 for every 2^14 = ~16k segments
+ int upperBound = (totalSegmentsInTier >> 14) * 100;
+ int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+ return Math.max(lowerBound, upperBound);
+ }
+
+ /**
+ * Calculates the maximum number of segments that can be picked for moving in
+ * the cluster in a single coordinator run, assuming that the run must finish
+ * within 40s. A typical coordinator run period is 1 minute and there should
+ * be a buffer of 20s for other coordinator duties.
+ * <p>
+ * This value must be calculated at the cluster level and then applied
+ * to every tier so that the total computation time is estimated correctly.
+ * <p>
+ * Each balancer thread can perform 2 billion computations in 40s (see
#14584).
+ * Therefore,
+ * <pre>
+ * numComputations = maxSegmentsToMove * totalSegments
+ *
+ * maxSegmentsToMove = numComputations / totalSegments
+ * = (nThreads * 2B) / totalSegments
+ * </pre>
+ *
+ * @param totalSegments Total number of all replicas of all segments loaded
or
+ * queued across all historicals in the cluster.
+ * @return {@code maxSegmentsToMove} per tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, ~20% of totalSegments].
+ * @see <a href="https://github.com/apache/druid/pull/14584">#14584</a>
+ */
+ public static int computeMaxSegmentsToMovePerTier(
+ int totalSegments,
+ int numBalancerThreads
+ )
+ {
+ Preconditions.checkArgument(
+ numBalancerThreads > 0 && numBalancerThreads <= 100,
+ "Number of balancer threads must be in range (0, 100]."
+ );
+ if (totalSegments <= 0) {
+ return 0;
+ }
+
+ // Divide by 2^9 and multiply by 100 so that the upperBound
+ // increases in steps of 100 for every 2^9 = 512 segments (~20%)
+ final int upperBound = (totalSegments >> 9) * 100;
+ final int lowerBound = MIN_SEGMENTS_TO_MOVE;
+
+ // Each thread can do ~2B computations in one cycle = 2M * 1k = 2^21 * 1k
+ int maxComputationsInThousands = numBalancerThreads << 21;
+ int maxSegmentsToMove = (maxComputationsInThousands / totalSegments) *
1000;
+
+ if (upperBound < lowerBound) {
+ return Math.min(lowerBound, totalSegments);
+ } else {
+ return Math.min(maxSegmentsToMove, upperBound);
+ }
+ }
+
+ /**
+ * Computes the number of segments that need to be moved across the
historicals
+ * in a tier to attain balance in terms of disk usage and segment counts per
+ * data source.
+ *
+ * @param tier Name of the tier used only for logging purposes
+ * @param historicals List of historicals in the tier
+ */
+ public static int computeNumSegmentsToMoveToBalanceTier(String tier,
List<ServerHolder> historicals)
+ {
+ if (historicals.isEmpty()) {
+ return 0;
+ }
+
+ return Math.max(
+ computeSegmentsToMoveToBalanceCountsPerDatasource(tier, historicals),
+ computeSegmentsToMoveToBalanceDiskUsage(tier, historicals)
+ );
+ }
+
+ private static long getTotalUsageBytes(List<ServerHolder> servers)
+ {
+ return servers.stream()
+ .mapToLong(server ->
server.getProjectedSegments().getTotalSegmentBytes())
+ .sum();
+ }
+
+ private static int getTotalSegmentCount(List<ServerHolder> servers)
+ {
+ return servers.stream()
+ .mapToInt(server ->
server.getProjectedSegments().getTotalSegmentCount())
+ .sum();
Review Comment:
same comment about collecting in a single pass
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]