This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5d4ac64178 Adapt maxSegmentsToMove based on cluster skew (#14584)
5d4ac64178 is described below
commit 5d4ac641783a359bc9ca11d20d25ad99310e0fc5
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Aug 17 11:14:54 2023 +0530
Adapt maxSegmentsToMove based on cluster skew (#14584)
Changes:
- No change in behaviour if `smartSegmentLoading` is disabled
- If `smartSegmentLoading` is enabled
- Compute `balancerComputeThreads` based on `numUsedSegments`
- Compute `maxSegmentsToMove` based on `balancerComputeThreads`
- Compute `segmentsToMoveToFixSkew` based on usage skew
- Compute `segmentsToMove = Math.min(maxSegmentsToMove,
segmentsToMoveToFixSkew)`
Limits:
- 1 <= `balancerComputeThreads` <= 8
- `maxSegmentsToMove` <= 20% of total segments
- `minSegmentsToMove` = 0.15% of total segments
---
.../druid/server/coordinator/DruidCoordinator.java | 46 +--
.../coordinator/SegmentCountsPerInterval.java | 25 +-
.../druid/server/coordinator/ServerHolder.java | 24 +-
.../coordinator/balancer/BalancerStrategy.java | 5 +-
.../coordinator/balancer/CostBalancerStrategy.java | 14 +-
.../balancer/RandomBalancerStrategy.java | 2 +-
.../balancer/SegmentToMoveCalculator.java | 307 +++++++++++++++++++++
.../coordinator/balancer/TierSegmentBalancer.java | 116 ++++----
.../server/coordinator/duty/BalanceSegments.java | 74 ++++-
.../duty/CollectSegmentAndServerStats.java | 4 +-
.../coordinator/duty/UnloadUnusedSegments.java | 2 +-
.../coordinator/loading/SegmentLoadingConfig.java | 74 ++---
.../coordinator/stats/CoordinatorRunStats.java | 2 +
.../coordinator/BalanceSegmentsProfiler.java | 5 +-
.../server/coordinator/DruidCoordinatorTest.java | 26 +-
.../druid/server/coordinator/ServerHolderTest.java | 2 -
.../balancer/CostBalancerStrategyTest.java | 17 +-
.../balancer/SegmentToMoveCalculatorTest.java | 261 ++++++++++++++++++
.../coordinator/duty/BalanceSegmentsTest.java | 3 +-
.../loading/SegmentLoadingConfigTest.java | 51 ++++
.../simulate/CoordinatorSimulationBaseTest.java | 2 +-
.../simulate/CoordinatorSimulationBuilder.java | 5 +-
.../coordinator/simulate/SegmentBalancingTest.java | 37 +++
23 files changed, 940 insertions(+), 164 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 09d8e2b687..fd017dd5ed 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -542,26 +542,37 @@ public class DruidCoordinator
}
}
+ /**
+ * Resets the balancerExec if required and creates a new BalancerStrategy for
+ * the current coordinator run.
+ */
@VisibleForTesting
- protected void initBalancerExecutor()
+ BalancerStrategy createBalancerStrategy(int balancerComputeThreads)
{
- final int currentNumber = getDynamicConfigs().getBalancerComputeThreads();
-
+ // Reset balancerExecutor if required
if (balancerExec == null) {
- balancerExec = createNewBalancerExecutor(currentNumber);
- } else if (cachedBalancerThreadNumber != currentNumber) {
+ balancerExec = createNewBalancerExecutor(balancerComputeThreads);
+ } else if (cachedBalancerThreadNumber != balancerComputeThreads) {
log.info(
- "balancerComputeThreads has changed from [%d] to [%d], recreating
the thread pool.",
- cachedBalancerThreadNumber,
- currentNumber
+ "'balancerComputeThreads' has changed from [%d] to [%d]",
+ cachedBalancerThreadNumber, balancerComputeThreads
);
balancerExec.shutdownNow();
- balancerExec = createNewBalancerExecutor(currentNumber);
+ balancerExec = createNewBalancerExecutor(balancerComputeThreads);
}
+
+ // Create BalancerStrategy
+ final BalancerStrategy balancerStrategy =
balancerStrategyFactory.createBalancerStrategy(balancerExec);
+ log.info(
+ "Using balancer strategy[%s] with [%d] threads.",
+ balancerStrategy.getClass().getSimpleName(), balancerComputeThreads
+ );
+ return balancerStrategy;
}
private ListeningExecutorService createNewBalancerExecutor(int numThreads)
{
+ log.info("Creating new balancer executor with [%d] threads.", numThreads);
cachedBalancerThreadNumber = numThreads;
return MoreExecutors.listeningDecorator(
Execs.multiThreaded(numThreads, "coordinator-cost-balancer-%s")
@@ -576,7 +587,7 @@ public class DruidCoordinator
new UpdateReplicationStatus(),
new UnloadUnusedSegments(loadQueueManager),
new
MarkOvershadowedSegmentsAsUnused(segmentsMetadataManager::markSegmentsAsUnused),
- new BalanceSegments(),
+ new BalanceSegments(config.getCoordinatorPeriod()),
new CollectSegmentAndServerStats(DruidCoordinator.this)
);
}
@@ -750,7 +761,7 @@ public class DruidCoordinator
);
log.info(
- "Emitted [%d] stats for group [%s]. All collected stats:%s\n",
+ "Emitted [%d] stats for group [%s]. All collected stats:%s",
emittedCount.get(), dutyGroupName, allStats.buildStatsTable()
);
}
@@ -758,7 +769,7 @@ public class DruidCoordinator
// Emit the runtime of the full DutiesRunnable
final long runMillis =
groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(),
runMillis);
- log.info("Finished coordinator run for group [%s] in [%d] ms",
dutyGroupName, runMillis);
+ log.info("Finished coordinator run for group [%s] in [%d] ms.%n",
dutyGroupName, runMillis);
}
catch (Exception e) {
log.makeAlert(e, "Caught exception, ignoring so that schedule keeps
going.").emit();
@@ -818,15 +829,8 @@ public class DruidCoordinator
final DruidCluster cluster = prepareCluster(dynamicConfig,
segmentLoadingConfig, currentServers);
cancelLoadsOnDecommissioningServers(cluster);
- initBalancerExecutor();
- final BalancerStrategy balancerStrategy =
balancerStrategyFactory.createBalancerStrategy(balancerExec);
- log.info(
- "Using balancer strategy [%s] with round-robin assignment [%s] and
debug dimensions [%s].",
- balancerStrategy.getClass().getSimpleName(),
- segmentLoadingConfig.isUseRoundRobinSegmentAssignment(),
- dynamicConfig.getDebugDimensions()
- );
-
+ final BalancerStrategy balancerStrategy
+ =
createBalancerStrategy(segmentLoadingConfig.getBalancerComputeThreads());
return params.buildFromExisting()
.withDruidCluster(cluster)
.withBalancerStrategy(balancerStrategy)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
index ea1f81ee6d..a9767b8f76 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
@@ -33,18 +33,37 @@ import java.util.Map;
*/
public class SegmentCountsPerInterval
{
- private final Map<String, Object2IntMap<Interval>>
- datasourceIntervalToSegmentCount = new HashMap<>();
+ private int totalSegments;
+ private long totalSegmentBytes;
+ private final Map<String, Object2IntMap<Interval>>
datasourceIntervalToSegmentCount = new HashMap<>();
private final Object2IntMap<Interval> intervalToTotalSegmentCount = new
Object2IntOpenHashMap<>();
+ private final Object2IntMap<String> datasourceToTotalSegmentCount = new
Object2IntOpenHashMap<>();
public void addSegment(DataSegment segment)
{
updateCountInInterval(segment, 1);
+ totalSegmentBytes += segment.getSize();
}
public void removeSegment(DataSegment segment)
{
updateCountInInterval(segment, -1);
+ totalSegmentBytes -= segment.getSize();
+ }
+
+ public int getTotalSegmentCount()
+ {
+ return totalSegments;
+ }
+
+ public long getTotalSegmentBytes()
+ {
+ return totalSegmentBytes;
+ }
+
+ public Object2IntMap<String> getDatasourceToTotalSegmentCount()
+ {
+ return datasourceToTotalSegmentCount;
}
public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
@@ -59,7 +78,9 @@ public class SegmentCountsPerInterval
private void updateCountInInterval(DataSegment segment, int delta)
{
+ totalSegments += delta;
intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta,
Integer::sum);
+ datasourceToTotalSegmentCount.mergeInt(segment.getDataSource(), delta,
Integer::sum);
datasourceIntervalToSegmentCount
.computeIfAbsent(segment.getDataSource(), ds -> new
Object2IntOpenHashMap<>())
.mergeInt(segment.getInterval(), delta, Integer::sum);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index d55ac035d6..5de1bd5ee0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -181,6 +181,23 @@ public class ServerHolder implements
Comparable<ServerHolder>
return server.getMaxSize();
}
+ /**
+ * Total projected disk usage of this server in bytes.
+ * <p>
+ * The total size:
+ * <ol>
+ * <li>INCLUDES segments loaded on this server</li>
+ * <li>INCLUDES segments loading on this server (actions:
LOAD/REPLICATE)</li>
+ * <li>INCLUDES segments moving to this server (action: MOVE_TO)</li>
+ * <li>INCLUDES segments moving from this server (action: MOVE_FROM). This is
+ * because these segments have only been <i>marked</i> for drop. We include
+ * the size of these segments to avoid over-assigning the server in case the
+ * corresponding MOVE_TO operation gets delayed or fails.</li>
+ * <li>EXCLUDES segments dropping from this server (action: DROP). Excluding
+ * these segments cannot result in over-assignment because drops are always
+ * processed before loads.</li>
+ * </ol>
+ */
public long getSizeUsed()
{
return server.getCurrSize() + sizeOfLoadingSegments -
sizeOfDroppingSegments;
@@ -317,6 +334,11 @@ public class ServerHolder implements
Comparable<ServerHolder>
return loadingReplicaCount;
}
+ public int getNumQueuedSegments()
+ {
+ return queuedSegments.size();
+ }
+
public boolean startOperation(SegmentAction action, DataSegment segment)
{
if (queuedSegments.containsKey(segment)) {
@@ -349,7 +371,7 @@ public class ServerHolder implements
Comparable<ServerHolder>
}
}
- public boolean hasSegmentLoaded(SegmentId segmentId)
+ private boolean hasSegmentLoaded(SegmentId segmentId)
{
return server.getSegment(segmentId) != null;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
index e133430b1b..9def032834 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
@@ -72,8 +72,7 @@ public interface BalancerStrategy
Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop,
List<ServerHolder> serverHolders);
/**
- * Returns the stats collected by the strategy in the current run and resets
- * the stats collector for the next run.
+ * Returns the stats collected by the strategy.
*/
- CoordinatorRunStats getAndResetStats();
+ CoordinatorRunStats getStats();
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
index aae907bb59..96a6ccccf5 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
@@ -46,6 +46,7 @@ import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class CostBalancerStrategy implements BalancerStrategy
@@ -68,6 +69,7 @@ public class CostBalancerStrategy implements BalancerStrategy
.thenComparing(pair -> pair.rhs);
private final CoordinatorRunStats stats = new CoordinatorRunStats();
+ private final AtomicLong computeTimeNanos = new AtomicLong(0);
public static double computeJointSegmentsCost(DataSegment segment,
Iterable<DataSegment> segmentSet)
{
@@ -263,9 +265,13 @@ public class CostBalancerStrategy implements
BalancerStrategy
}
@Override
- public CoordinatorRunStats getAndResetStats()
+ public CoordinatorRunStats getStats()
{
- return stats.getSnapshotAndReset();
+ stats.add(
+ Stats.Balancer.COMPUTATION_TIME,
+ TimeUnit.NANOSECONDS.toMillis(computeTimeNanos.getAndSet(0))
+ );
+ return stats;
}
/**
@@ -351,8 +357,8 @@ public class CostBalancerStrategy implements
BalancerStrategy
// Report computation stats
computeTime.stop();
- stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
- stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey,
computeTime.elapsed(TimeUnit.MILLISECONDS));
+ stats.add(Stats.Balancer.COMPUTATION_COUNT, 1);
+ computeTimeNanos.addAndGet(computeTime.elapsed(TimeUnit.NANOSECONDS));
return costPrioritizedServers.stream().map(pair -> pair.rhs)
.collect(Collectors.toList());
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
index fc0da34310..b8e0000567 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
@@ -73,7 +73,7 @@ public class RandomBalancerStrategy implements
BalancerStrategy
}
@Override
- public CoordinatorRunStats getAndResetStats()
+ public CoordinatorRunStats getStats()
{
return CoordinatorRunStats.empty();
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
new file mode 100644
index 0000000000..d8846740a7
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.joda.time.Duration;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+ /**
+ * At least this number of segments must be picked for moving in every cycle
+ * to keep the cluster well balanced.
+ */
+ private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+ private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+ /**
+ * Calculates the number of segments to be picked for moving in the given
tier,
+ * based on the level of skew between the historicals in the tier.
+ *
+ * @param tier Name of tier used for logging purposes
+ * @param historicals Active historicals in tier
+ * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be
moved
+ * in the tier.
+ * @return Number of segments to move in the tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+ */
+ public static int computeNumSegmentsToMoveInTier(
+ String tier,
+ List<ServerHolder> historicals,
+ int maxSegmentsToMoveInTier
+ )
+ {
+ final int totalSegments = historicals.stream().mapToInt(
+ server -> server.getProjectedSegments().getTotalSegmentCount()
+ ).sum();
+
+ // Move at least some segments to ensure that the cluster is always
balancing itself
+ final int minSegmentsToMove = SegmentToMoveCalculator
+ .computeMinSegmentsToMoveInTier(totalSegments);
+ final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+ log.info(
+ "Need to move [%,d] segments in tier[%s] to attain balance. Allowed
values are [min=%d, max=%d].",
+ segmentsToMoveToFixDeviation, tier, minSegmentsToMove,
maxSegmentsToMoveInTier
+ );
+
+ final int activeSegmentsToMove = Math.max(minSegmentsToMove,
segmentsToMoveToFixDeviation);
+ return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+ }
+
+ /**
+ * Calculates the minimum number of segments that should be considered for
+ * moving in a tier, so that the cluster is always balancing itself.
+ * <p>
+ * This value must be calculated separately for every tier.
+ *
+ * @param totalSegmentsInTier Total number of all replicas of all segments
+ * loaded or queued across all historicals in the
tier.
+ * @return {@code minSegmentsToMoveInTier} in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.15% of totalSegmentsInTier}].
+ */
+ public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+ {
+ // Divide by 2^14 and multiply by 100 so that the value increases
+ // in steps of 100 for every 2^16 = ~65k segments
+ int upperBound = (totalSegmentsInTier >> 16) * 100;
+ int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+ return Math.max(lowerBound, upperBound);
+ }
+
+ /**
+ * Calculates the maximum number of segments that can be picked for moving in
+ * the cluster in a single coordinator run.
+ * <p>
+ * This value must be calculated at the cluster level and then applied
+ * to every tier so that the total computation time is estimated correctly.
+ * <p>
+ * Each balancer thread can perform 1 billion computations in 20s (see
#14584).
+ * Therefore, keeping a buffer of 10s, in every 30s:
+ * <pre>
+ * numComputations = maxSegmentsToMove * totalSegments
+ *
+ * maxSegmentsToMove = numComputations / totalSegments
+ * = (nThreads * 1B) / totalSegments
+ * </pre>
+ *
+ * @param totalSegments Total number of all replicas of all segments loaded
or
+ * queued across all historicals in the cluster.
+ * @return {@code maxSegmentsToMove} per tier in the range
+ * [{@link #MIN_SEGMENTS_TO_MOVE}, ~20% of totalSegments].
+ * @see <a href="https://github.com/apache/druid/pull/14584">#14584</a>
+ */
+ public static int computeMaxSegmentsToMovePerTier(
+ int totalSegments,
+ int numBalancerThreads,
+ Duration coordinatorPeriod
+ )
+ {
+ Preconditions.checkArgument(
+ numBalancerThreads > 0 && numBalancerThreads <= 100,
+ "Number of balancer threads must be in range (0, 100]."
+ );
+ if (totalSegments <= 0) {
+ return 0;
+ }
+
+ // Divide by 2^9 and multiply by 100 so that the upperBound
+ // increases in steps of 100 for every 2^9 = 512 segments (~20%)
+ final int upperBound = (totalSegments >> 9) * 100;
+ final int lowerBound = MIN_SEGMENTS_TO_MOVE;
+
+ int num30sPeriods = Math.min(4, (int) (coordinatorPeriod.getMillis() /
30_000));
+
+ // Each thread can do ~1B computations in 30s = 1M * 1k = 2^20 * 1k
+ int maxComputationsInThousands = (numBalancerThreads * num30sPeriods) <<
20;
+ int maxSegmentsToMove = (maxComputationsInThousands / totalSegments) *
1000;
+
+ if (upperBound < lowerBound) {
+ return Math.min(lowerBound, totalSegments);
+ } else {
+ return Math.min(maxSegmentsToMove, upperBound);
+ }
+ }
+
+ /**
+ * Computes the number of segments that need to be moved across the
historicals
+ * in a tier to attain balance in terms of disk usage and segment counts per
+ * data source.
+ *
+ * @param tier Name of the tier used only for logging purposes
+ * @param historicals List of historicals in the tier
+ */
+ public static int computeNumSegmentsToMoveToBalanceTier(String tier,
List<ServerHolder> historicals)
+ {
+ if (historicals.isEmpty()) {
+ return 0;
+ }
+
+ return Math.max(
+ computeSegmentsToMoveToBalanceCountsPerDatasource(tier, historicals),
+ computeSegmentsToMoveToBalanceDiskUsage(tier, historicals)
+ );
+ }
+
+ private static double getAverageSegmentSize(List<ServerHolder> servers)
+ {
+ int totalSegmentCount = 0;
+ long totalUsageBytes = 0;
+ for (ServerHolder server : servers) {
+ totalSegmentCount +=
server.getProjectedSegments().getTotalSegmentCount();
+ totalUsageBytes += server.getProjectedSegments().getTotalSegmentBytes();
+ }
+
+ if (totalSegmentCount <= 0 || totalUsageBytes <= 0) {
+ return 0;
+ } else {
+ return (1.0 * totalUsageBytes) / totalSegmentCount;
+ }
+ }
+
+ /**
+ * Computes the number of segments to move across the servers of the tier in
+ * order to balance the segment counts of the most unbalanced datasource.
+ */
+ static int computeSegmentsToMoveToBalanceCountsPerDatasource(
+ String tier,
+ List<ServerHolder> servers
+ )
+ {
+ // Find all the datasources
+ final Set<String> datasources = servers.stream().flatMap(
+ s ->
s.getProjectedSegments().getDatasourceToTotalSegmentCount().keySet().stream()
+ ).collect(Collectors.toSet());
+ if (datasources.isEmpty()) {
+ return 0;
+ }
+
+ // Compute the min and max number of segments for each datasource
+ final Object2IntMap<String> datasourceToMaxSegments = new
Object2IntOpenHashMap<>();
+ final Object2IntMap<String> datasourceToMinSegments = new
Object2IntOpenHashMap<>();
+ for (ServerHolder server : servers) {
+ final Object2IntMap<String> datasourceToSegmentCount
+ = server.getProjectedSegments().getDatasourceToTotalSegmentCount();
+ for (String datasource : datasources) {
+ int count = datasourceToSegmentCount.getInt(datasource);
+ datasourceToMaxSegments.mergeInt(datasource, count, Math::max);
+ datasourceToMinSegments.mergeInt(datasource, count, Math::min);
+ }
+ }
+
+ // Compute the gap between min and max for each datasource and order by
largest first
+ final TreeMap<Integer, String> countDiffToDatasource = new
TreeMap<>(Comparator.reverseOrder());
+ datasourceToMaxSegments.object2IntEntrySet().forEach(entry -> {
+ String datasource = entry.getKey();
+ int maxCount = entry.getIntValue();
+ int minCount = datasourceToMinSegments.getInt(datasource);
+ countDiffToDatasource.put(maxCount - minCount, datasource);
+ });
+
+ // Identify the most unbalanced datasource
+ final Map.Entry<Integer, String> maxCountDifference =
countDiffToDatasource.firstEntry();
+ String mostUnbalancedDatasource = maxCountDifference.getValue();
+ int minNumSegments = Integer.MAX_VALUE;
+ int maxNumSegments = 0;
+ for (ServerHolder server : servers) {
+ int countForSkewedDatasource = server.getProjectedSegments()
+ .getDatasourceToTotalSegmentCount()
+ .getInt(mostUnbalancedDatasource);
+
+ minNumSegments = Math.min(minNumSegments, countForSkewedDatasource);
+ maxNumSegments = Math.max(maxNumSegments, countForSkewedDatasource);
+ }
+
+ final int numSegmentsToMove = maxCountDifference.getKey() / 2;
+ if (numSegmentsToMove > 0) {
+ log.info(
+ "Need to move [%,d] segments of datasource[%s] in tier[%s] to fix
gap between min[%,d] and max[%,d].",
+ numSegmentsToMove, mostUnbalancedDatasource, tier, minNumSegments,
maxNumSegments
+ );
+ }
+ return numSegmentsToMove;
+ }
+
+ private static int computeSegmentsToMoveToBalanceDiskUsage(
+ String tier,
+ List<ServerHolder> servers
+ )
+ {
+ if (servers.isEmpty()) {
+ return 0;
+ }
+
+ double maxUsagePercent = 0.0;
+ double minUsagePercent = 100.0;
+
+ long maxUsageBytes = 0;
+ long minUsageBytes = Long.MAX_VALUE;
+ for (ServerHolder server : servers) {
+ final SegmentCountsPerInterval projectedSegments =
server.getProjectedSegments();
+
+ // Track the maximum and minimum values
+ long serverUsageBytes = projectedSegments.getTotalSegmentBytes();
+ maxUsageBytes = Math.max(serverUsageBytes, maxUsageBytes);
+ minUsageBytes = Math.min(serverUsageBytes, minUsageBytes);
+
+ double diskUsage = server.getMaxSize() <= 0
+ ? 0 : (100.0 *
projectedSegments.getTotalSegmentBytes()) / server.getMaxSize();
+ maxUsagePercent = Math.max(diskUsage, maxUsagePercent);
+ minUsagePercent = Math.min(diskUsage, minUsagePercent);
+ }
+
+ final double averageSegmentSize = getAverageSegmentSize(servers);
+ final long differenceInUsageBytes = maxUsageBytes - minUsageBytes;
+ final int numSegmentsToMove = averageSegmentSize <= 0
+ ? 0 : (int) (differenceInUsageBytes /
averageSegmentSize) / 2;
+
+ if (numSegmentsToMove > 0) {
+ log.info(
+ "Need to move [%,d] segments of avg size [%,d MB] in tier[%s] to fix"
+ + " disk usage gap between min[%d GB][%.1f%%] and max[%d
GB][%.1f%%].",
+ numSegmentsToMove, ((long) averageSegmentSize) >> 20, tier,
+ minUsageBytes >> 30, minUsagePercent, maxUsageBytes >> 30,
maxUsagePercent
+ );
+ }
+ return numSegmentsToMove;
+ }
+
+ private SegmentToMoveCalculator()
+ {
+ // no instantiation
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
index 86e5b99629..1523576b11 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
@@ -21,9 +21,9 @@ package org.apache.druid.server.coordinator.balancer;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Dimension;
@@ -55,27 +55,23 @@ public class TierSegmentBalancer
private final DruidCoordinatorRuntimeParams params;
private final StrategicSegmentAssigner segmentAssigner;
- private final SegmentLoadingConfig loadingConfig;
private final CoordinatorRunStats runStats;
private final List<ServerHolder> activeServers;
private final List<ServerHolder> decommissioningServers;
- private final int totalMaxSegmentsToMove;
-
+ private final int maxSegmentsToMove;
private final int movingSegmentCount;
public TierSegmentBalancer(
String tier,
Set<ServerHolder> servers,
+ int maxSegmentsToMove,
DruidCoordinatorRuntimeParams params
)
{
this.tier = tier;
this.params = params;
this.segmentAssigner = params.getSegmentAssigner();
-
- this.loadingConfig = params.getSegmentLoadingConfig();
- this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
this.runStats = segmentAssigner.getStats();
Map<Boolean, List<ServerHolder>> partitions =
@@ -84,55 +80,29 @@ public class TierSegmentBalancer
this.activeServers = partitions.get(false);
this.movingSegmentCount =
activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
+ this.maxSegmentsToMove = maxSegmentsToMove;
}
public void run()
{
- if (activeServers.isEmpty() || (activeServers.size() <= 1 &&
decommissioningServers.isEmpty())) {
- log.warn(
- "Skipping balance for tier [%s] with [%d] active servers and [%d]
decomissioning servers.",
- tier, activeServers.size(), decommissioningServers.size()
- );
- return;
- }
-
- log.info(
- "Moving max [%d] segments in tier [%s] with [%d] active servers and"
- + " [%d] decommissioning servers. There are [%d] segments already in
queue.",
- totalMaxSegmentsToMove, tier, activeServers.size(),
decommissioningServers.size(), movingSegmentCount
- );
-
- // Move segments from decommissioning to active servers
- int movedDecommSegments = 0;
- if (!decommissioningServers.isEmpty()) {
- int maxDecommPercentToMove =
loadingConfig.getPercentDecommSegmentsToMove();
- int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove *
(maxDecommPercentToMove / 100.0));
- movedDecommSegments +=
- moveSegmentsFromTo(decommissioningServers, activeServers,
maxDecommSegmentsToMove);
- log.info(
- "Moved [%d] segments out of max [%d (%d%%)] from decommissioning to
active servers in tier [%s].",
- movedDecommSegments, maxDecommSegmentsToMove,
maxDecommPercentToMove, tier
- );
- }
+ int numDecommSegmentsToMove =
getNumDecommSegmentsToMove(maxSegmentsToMove);
+ moveSegmentsFrom(decommissioningServers, numDecommSegmentsToMove,
"decommissioning");
- // Move segments across active servers
- int maxGeneralSegmentsToMove = totalMaxSegmentsToMove -
movedDecommSegments;
- int movedGeneralSegments =
- moveSegmentsFromTo(activeServers, activeServers,
maxGeneralSegmentsToMove);
- log.info(
- "Moved [%d] segments out of max [%d] between active servers in tier
[%s].",
- movedGeneralSegments, maxGeneralSegmentsToMove, tier
- );
+ int numActiveSegmentsToMove = getNumActiveSegmentsToMove(maxSegmentsToMove
- numDecommSegmentsToMove);
+ moveSegmentsFrom(activeServers, numActiveSegmentsToMove, "active");
}
- private int moveSegmentsFromTo(
- List<ServerHolder> sourceServers,
- List<ServerHolder> destServers,
- int maxSegmentsToMove
+ /**
+ * Moves segments from the given source servers to the active servers in
this tier.
+ */
+ private void moveSegmentsFrom(
+ final List<ServerHolder> sourceServers,
+ final int numSegmentsToMove,
+ final String sourceServerType
)
{
- if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() ||
destServers.isEmpty()) {
- return 0;
+ if (numSegmentsToMove <= 0 || sourceServers.isEmpty() ||
activeServers.isEmpty()) {
+ return;
}
final Set<String> broadcastDatasources = params.getBroadcastDatasources();
@@ -140,25 +110,30 @@ public class TierSegmentBalancer
// Always move loading segments first as it is a cheaper operation
List<BalancerSegmentHolder> pickedSegments =
ReservoirSegmentSampler.pickMovableSegmentsFrom(
sourceServers,
- maxSegmentsToMove,
+ numSegmentsToMove,
ServerHolder::getLoadingSegments,
broadcastDatasources
);
- int movedCount = moveSegmentsTo(destServers, pickedSegments,
maxSegmentsToMove);
+ int movedCount = moveSegmentsTo(activeServers, pickedSegments,
numSegmentsToMove);
// Move loaded segments only if tier is not already busy moving segments
if (movingSegmentCount <= 0) {
- maxSegmentsToMove -= movedCount;
+ int numLoadedSegmentsToMove = numSegmentsToMove - movedCount;
pickedSegments = ReservoirSegmentSampler.pickMovableSegmentsFrom(
sourceServers,
- maxSegmentsToMove,
+ numLoadedSegmentsToMove,
server -> server.getServer().iterateAllSegments(),
broadcastDatasources
);
- movedCount += moveSegmentsTo(destServers, pickedSegments,
maxSegmentsToMove);
+ movedCount += moveSegmentsTo(activeServers, pickedSegments,
numLoadedSegmentsToMove);
+ } else {
+ log.info("There are already [%,d] segments moving in tier[%s].",
movingSegmentCount, tier);
}
- return movedCount;
+ log.info(
+ "Moved [%,d of %,d] segments from [%d] [%s] servers in tier [%s].",
+ movedCount, numSegmentsToMove, sourceServers.size(), sourceServerType,
tier
+ );
}
private int moveSegmentsTo(
@@ -221,4 +196,39 @@ public class TierSegmentBalancer
runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
}
+ /**
+ * Number of segments to move away from the decommissioning historicals of
this tier.
+ */
+ private int getNumDecommSegmentsToMove(int maxSegmentsToMove)
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ if (decommissioningServers.isEmpty() || activeServers.isEmpty()) {
+ return 0;
+ } else if (dynamicConfig.isSmartSegmentLoading()) {
+ final int decommSegmentsToMove =
decommissioningServers.stream().mapToInt(
+ server -> server.getProjectedSegments().getTotalSegmentCount()
+ ).sum();
+ return Math.min(decommSegmentsToMove, maxSegmentsToMove);
+ } else {
+ int maxPercentageToMove =
dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();
+ return (int) Math.ceil(maxSegmentsToMove * (maxPercentageToMove /
100.0));
+ }
+ }
+
+ /**
+ * Number of segments to move between the active historicals of this tier.
+ */
+ private int getNumActiveSegmentsToMove(int maxActiveSegmentsToMove)
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ if (activeServers.size() < 2) {
+ return 0;
+ } else if (dynamicConfig.isSmartSegmentLoading()) {
+ return SegmentToMoveCalculator.computeNumSegmentsToMoveInTier(tier,
activeServers, maxActiveSegmentsToMove);
+ } else {
+ // If smartSegmentLoading is disabled, just use the configured value
+ return maxActiveSegmentsToMove;
+ }
+ }
+
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index 4ff2e65743..37e58ff28f 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -19,12 +19,19 @@
package org.apache.druid.server.coordinator.duty;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.balancer.SegmentToMoveCalculator;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.joda.time.Duration;
+
+import java.util.Set;
/**
*
@@ -33,6 +40,13 @@ public class BalanceSegments implements CoordinatorDuty
{
private static final EmittingLogger log = new
EmittingLogger(BalanceSegments.class);
+ private final Duration coordinatorPeriod;
+
+ public BalanceSegments(Duration coordinatorPeriod)
+ {
+ this.coordinatorPeriod = coordinatorPeriod;
+ }
+
@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams
params)
{
@@ -43,27 +57,79 @@ public class BalanceSegments implements CoordinatorDuty
final DruidCluster cluster = params.getDruidCluster();
final SegmentLoadingConfig loadingConfig =
params.getSegmentLoadingConfig();
- final int maxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
+
+ final int maxSegmentsToMove = getMaxSegmentsToMove(params);
if (maxSegmentsToMove <= 0) {
log.info("Skipping balance as maxSegmentsToMove is [%d].",
maxSegmentsToMove);
return params;
} else {
log.info(
- "Balancing segments in tiers [%s] with maxSegmentsToMove=[%d],
maxLifetime=[%d].",
+ "Balancing segments in tiers [%s] with maxSegmentsToMove[%,d] and
maxLifetime[%d].",
cluster.getTierNames(), maxSegmentsToMove,
loadingConfig.getMaxLifetimeInLoadQueue()
);
}
cluster.getHistoricals().forEach(
- (tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
+ (tier, servers) -> new TierSegmentBalancer(tier, servers,
maxSegmentsToMove, params).run()
);
CoordinatorRunStats runStats = params.getCoordinatorStats();
params.getBalancerStrategy()
- .getAndResetStats()
+ .getStats()
.forEachStat(runStats::add);
return params;
}
+ /**
+ * Recomputes the value of {@code maxSegmentsToMove} if smart segment loading
+ * is enabled. {@code maxSegmentsToMove} defines only the upper bound, the
actual
+ * number of segments picked for moving is determined by the {@link
TierSegmentBalancer}
+ * based on the level of skew in the tier.
+ */
+ private int getMaxSegmentsToMove(DruidCoordinatorRuntimeParams params)
+ {
+ final CoordinatorDynamicConfig dynamicConfig =
params.getCoordinatorDynamicConfig();
+ if (dynamicConfig.isSmartSegmentLoading()) {
+ final Pair<Integer, Integer> numHistoricalsAndSegments =
getNumHistoricalsAndSegments(params.getDruidCluster());
+ final int totalSegmentsInCluster = numHistoricalsAndSegments.rhs;
+
+ final int numBalancerThreads =
params.getSegmentLoadingConfig().getBalancerComputeThreads();
+ final int maxSegmentsToMove = SegmentToMoveCalculator
+ .computeMaxSegmentsToMovePerTier(totalSegmentsInCluster,
numBalancerThreads, coordinatorPeriod);
+ log.info(
+ "Computed maxSegmentsToMove[%,d] for total [%,d] segments on [%d]
historicals.",
+ maxSegmentsToMove, totalSegmentsInCluster,
numHistoricalsAndSegments.lhs
+ );
+
+ return maxSegmentsToMove;
+ } else {
+ return dynamicConfig.getMaxSegmentsToMove();
+ }
+ }
+
+ /**
+ * Calculates the total number of historicals (active and decommissioning)
and
+ * the total number of segments on these historicals that would participate
in
+ * cost computations. This includes all replicas of all loaded, loading,
dropping
+ * and moving segments.
+ * <p>
+ * This is calculated here to ensure that all assignments done by the
preceding
+ * {@link RunRules} duty are accounted for.
+ */
+ private Pair<Integer, Integer> getNumHistoricalsAndSegments(DruidCluster
cluster)
+ {
+ int numHistoricals = 0;
+ int numSegments = 0;
+
+ for (Set<ServerHolder> historicals : cluster.getHistoricals().values()) {
+ for (ServerHolder historical : historicals) {
+ ++numHistoricals;
+ numSegments += historical.getServer().getNumSegments() +
historical.getNumQueuedSegments();
+ }
+ }
+
+ return Pair.of(numHistoricals, numSegments);
+ }
+
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
index 1c0ff573d1..8c5acaeebb 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
@@ -144,8 +144,8 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
final int numHistoricals = historicals.size();
log.info(
- "Tier [%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
- + " across [%d] historicals with average usage [%d GBs], [%.1f%%].",
+ "Tier[%s] is serving [%,d], loading [%,d] and dropping [%,d] segments"
+ + " across [%d] historicals with average usage[%d GBs], [%.1f%%].",
tier, servedCount.get(), loadingCount.get(), droppingCount.get(),
numHistoricals,
(currentBytesSum.get() >> 30) / numHistoricals, usageSum.get() /
numHistoricals
);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
index 8bdcee3641..5a135d81cf 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
@@ -97,7 +97,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty
&& loadQueueManager.dropSegment(segment, serverHolder)) {
totalUnneededCount++;
log.debug(
- "Dropping uneeded segment[%s] from server[%s] in tier[%s]",
+ "Dropping uneeded segment[%s] from server[%s] in tier[%s].",
segment.getId(), server.getName(), server.getTier()
);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index bb42e95eaa..f08b7ed5ca 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -35,33 +35,27 @@ public class SegmentLoadingConfig
private final int maxReplicaAssignmentsInRun;
private final int maxLifetimeInLoadQueue;
- private final int maxSegmentsToMove;
- private final int percentDecommSegmentsToMove;
+ private final int balancerComputeThreads;
private final boolean useRoundRobinSegmentAssignment;
/**
- * Creates a new SegmentLoadingConfig with recomputed coordinator config
values from
+ * Creates a new SegmentLoadingConfig with recomputed coordinator config
values
* based on whether {@link CoordinatorDynamicConfig#isSmartSegmentLoading()}
* is enabled or not.
*/
public static SegmentLoadingConfig create(CoordinatorDynamicConfig
dynamicConfig, int numUsedSegments)
{
if (dynamicConfig.isSmartSegmentLoading()) {
- // Compute recommended values
- // Impose a lower bound on both replicationThrottleLimit and
maxSegmentsToMove
+ // Compute replicationThrottleLimit with a lower bound of 100
final int throttlePercentage = 2;
final int replicationThrottleLimit = Math.max(100, numUsedSegments *
throttlePercentage / 100);
-
- // Impose an upper bound on maxSegmentsToMove to ensure that coordinator
- // run times are bounded. This limit can be relaxed as performance of
- // the CostBalancerStrategy.computeCost() is improved.
- final int maxSegmentsToMove = Math.min(1000, replicationThrottleLimit);
+ final int balancerComputeThreads =
computeNumBalancerThreads(numUsedSegments);
log.info(
- "Smart segment loading is enabled. Recomputed
replicationThrottleLimit"
- + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove
[%,d].",
- replicationThrottleLimit, throttlePercentage, numUsedSegments,
maxSegmentsToMove
+ "Smart segment loading is enabled. Calculated
balancerComputeThreads[%d]"
+ + " and replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
+ balancerComputeThreads, replicationThrottleLimit,
throttlePercentage, numUsedSegments
);
return new SegmentLoadingConfig(
@@ -69,9 +63,8 @@ public class SegmentLoadingConfig
replicationThrottleLimit,
Integer.MAX_VALUE,
60,
- maxSegmentsToMove,
- 100,
- true
+ true,
+ balancerComputeThreads
);
} else {
// Use the configured values
@@ -80,9 +73,8 @@ public class SegmentLoadingConfig
dynamicConfig.getReplicationThrottleLimit(),
dynamicConfig.getMaxNonPrimaryReplicantsToLoad(),
dynamicConfig.getReplicantLifetime(),
- dynamicConfig.getMaxSegmentsToMove(),
- dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove(),
- dynamicConfig.isUseRoundRobinSegmentAssignment()
+ dynamicConfig.isUseRoundRobinSegmentAssignment(),
+ dynamicConfig.getBalancerComputeThreads()
);
}
}
@@ -92,18 +84,16 @@ public class SegmentLoadingConfig
int replicationThrottleLimit,
int maxReplicaAssignmentsInRun,
int maxLifetimeInLoadQueue,
- int maxSegmentsToMove,
- int percentDecommSegmentsToMove,
- boolean useRoundRobinSegmentAssignment
+ boolean useRoundRobinSegmentAssignment,
+ int balancerComputeThreads
)
{
this.maxSegmentsInLoadQueue = maxSegmentsInLoadQueue;
this.replicationThrottleLimit = replicationThrottleLimit;
this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun;
this.maxLifetimeInLoadQueue = maxLifetimeInLoadQueue;
- this.maxSegmentsToMove = maxSegmentsToMove;
- this.percentDecommSegmentsToMove = percentDecommSegmentsToMove;
this.useRoundRobinSegmentAssignment = useRoundRobinSegmentAssignment;
+ this.balancerComputeThreads = balancerComputeThreads;
}
public int getMaxSegmentsInLoadQueue()
@@ -111,11 +101,6 @@ public class SegmentLoadingConfig
return maxSegmentsInLoadQueue;
}
- public int getMaxSegmentsToMove()
- {
- return maxSegmentsToMove;
- }
-
public int getReplicationThrottleLimit()
{
return replicationThrottleLimit;
@@ -136,8 +121,35 @@ public class SegmentLoadingConfig
return maxReplicaAssignmentsInRun;
}
- public int getPercentDecommSegmentsToMove()
+ public int getBalancerComputeThreads()
{
- return percentDecommSegmentsToMove;
+ return balancerComputeThreads;
+ }
+
+ /**
+ * Computes the number of threads to be used in the balancing executor.
+ * The number of used segments in a cluster is generally a good indicator of
+ * the cluster size and has been used here as a proxy for the actual number
of
+ * segments that would be involved in cost computations.
+ * <p>
+ * The number of threads increases by 1 first for every 50k segments, then
for
+ * every 75k segments and so on.
+ *
+ * @return Number of {@code balancerComputeThreads} in the range [1, 8].
+ */
+ public static int computeNumBalancerThreads(int numUsedSegments)
+ {
+ // Add an extra thread when numUsedSegments increases by a step
+ final int[] stepValues = {50, 50, 75, 75, 100, 100, 150, 150};
+
+ int remainder = numUsedSegments / 1000;
+ for (int step = 0; step < stepValues.length; ++step) {
+ remainder -= stepValues[step];
+ if (remainder < 0) {
+ return step + 1;
+ }
+ }
+
+ return stepValues.length;
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index 6d0e1b8ae8..cbee974452 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -255,6 +255,8 @@ public class CoordinatorRunStats
{
if (debugDimensions.isEmpty()) {
return false;
+ } else if (rowKey.getValues().isEmpty()) {
+ return true;
}
for (Map.Entry<Dimension, String> entry : rowKey.getValues().entrySet()) {
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
index d1df912698..63a0944ed7 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsProfiler.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
+import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
@@ -142,7 +143,7 @@ public class BalanceSegmentsProfiler
.withDatabaseRuleManager(manager)
.build();
- BalanceSegments tester = new BalanceSegments();
+ BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
RunRules runner = new RunRules(Set::size);
watch.start();
DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
@@ -188,7 +189,7 @@ public class BalanceSegmentsProfiler
.withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE).build())
.withSegmentAssignerUsing(loadQueueManager)
.build();
- BalanceSegments tester = new BalanceSegments();
+ BalanceSegments tester = new BalanceSegments(Duration.standardMinutes(1));
watch.start();
DruidCoordinatorRuntimeParams balanceParams = tester.run(params);
System.out.println(watch.stop());
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index cf53708af0..0e75f95a1f 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ServerType;
import
org.apache.druid.server.coordinator.balancer.CostBalancerStrategyFactory;
+import
org.apache.druid.server.coordinator.balancer.RandomBalancerStrategyFactory;
import org.apache.druid.server.coordinator.duty.CompactSegments;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDuty;
import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroup;
@@ -585,25 +586,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
@Test
public void testBalancerThreadNumber()
{
- CoordinatorDynamicConfig dynamicConfig =
EasyMock.createNiceMock(CoordinatorDynamicConfig.class);
-
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(5).times(2);
-
EasyMock.expect(dynamicConfig.getBalancerComputeThreads()).andReturn(10).once();
-
- JacksonConfigManager configManager =
EasyMock.createNiceMock(JacksonConfigManager.class);
- EasyMock.expect(
- configManager.watch(
- EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
- EasyMock.anyObject(Class.class),
- EasyMock.anyObject()
- )
- ).andReturn(new AtomicReference<>(dynamicConfig)).anyTimes();
-
ScheduledExecutorFactory scheduledExecutorFactory =
EasyMock.createNiceMock(ScheduledExecutorFactory.class);
- EasyMock.replay(configManager, dynamicConfig, scheduledExecutorFactory);
+ EasyMock.replay(scheduledExecutorFactory);
DruidCoordinator c = new DruidCoordinator(
druidCoordinatorConfig,
- configManager,
+ EasyMock.createNiceMock(JacksonConfigManager.class),
null,
null,
null,
@@ -617,7 +605,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
null,
null,
new CoordinatorCustomDutyGroups(ImmutableSet.of()),
- null,
+ new RandomBalancerStrategyFactory(),
null,
null,
null
@@ -628,20 +616,20 @@ public class DruidCoordinatorTest extends CuratorTestBase
Assert.assertNull(c.getBalancerExec());
// first initialization
- c.initBalancerExecutor();
+ c.createBalancerStrategy(5);
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService firstExec = c.getBalancerExec();
Assert.assertNotNull(firstExec);
// second initialization, expect no changes as cachedBalancerThreadNumber
is not changed
- c.initBalancerExecutor();
+ c.createBalancerStrategy(5);
Assert.assertEquals(5, c.getCachedBalancerThreadNumber());
ListeningExecutorService secondExec = c.getBalancerExec();
Assert.assertNotNull(secondExec);
Assert.assertSame(firstExec, secondExec);
// third initialization, expect executor recreated as
cachedBalancerThreadNumber is changed to 10
- c.initBalancerExecutor();
+ c.createBalancerStrategy(10);
Assert.assertEquals(10, c.getCachedBalancerThreadNumber());
ListeningExecutorService thirdExec = c.getBalancerExec();
Assert.assertNotNull(thirdExec);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
index 11e36dea18..70fb2dd0b6 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/ServerHolderTest.java
@@ -193,8 +193,6 @@ public class ServerHolderTest
);
Assert.assertTrue(h1.isServingSegment(SEGMENTS.get(0)));
Assert.assertFalse(h1.isServingSegment(SEGMENTS.get(1)));
- Assert.assertTrue(h1.hasSegmentLoaded(SEGMENTS.get(0).getId()));
- Assert.assertFalse(h1.hasSegmentLoaded(SEGMENTS.get(1).getId()));
Assert.assertFalse(h1.isLoadQueueFull());
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
index e4b3715960..576dfbb03d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
@@ -31,8 +31,6 @@ import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
-import org.apache.druid.server.coordinator.stats.Dimension;
-import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
@@ -317,7 +315,7 @@ public class CostBalancerStrategyTest
}
@Test
- public void testGetAndResetStats()
+ public void testGetStats()
{
final ServerHolder serverA = new ServerHolder(
createHistorical().toImmutableDruidServer(),
@@ -332,20 +330,13 @@ public class CostBalancerStrategyTest
// Verify that computation stats have been tracked
strategy.findServersToLoadSegment(segment, Arrays.asList(serverA,
serverB));
- CoordinatorRunStats computeStats = strategy.getAndResetStats();
+ CoordinatorRunStats computeStats = strategy.getStats();
- final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
- .with(Dimension.DESCRIPTION, "LOAD")
- .and(Dimension.TIER, "hot");
- Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT,
rowKey));
+ Assert.assertEquals(1L,
computeStats.get(Stats.Balancer.COMPUTATION_COUNT));
- long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME,
rowKey);
+ long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME);
Assert.assertTrue(computeTime >= 0 && computeTime <= 100);
Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS));
-
- // Verify that stats have been reset
- computeStats = strategy.getAndResetStats();
- Assert.assertEquals(0, computeStats.rowCount());
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
new file mode 100644
index 0000000000..58c1cde409
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculatorTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
+import org.apache.druid.server.coordinator.loading.TestLoadQueuePeon;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class SegmentToMoveCalculatorTest
+{
+
+ private static final Duration DEFAULT_COORDINATOR_PERIOD =
Duration.standardMinutes(1);
+
+ /**
+ * 100 days x 100 partitions = 10,000 segments.
+ */
+ private static final List<DataSegment> WIKI_SEGMENTS
+ = CreateDataSegments.ofDatasource("wiki")
+ .forIntervals(100, Granularities.DAY)
+ .withNumPartitions(100)
+ .eachOfSizeInMb(500);
+
+ /**
+ * 10 days * 1 partitions = 10 segments.
+ */
+ private static final List<DataSegment> KOALA_SEGMENTS
+ = CreateDataSegments.ofDatasource("koala")
+ .forIntervals(10, Granularities.DAY)
+ .eachOfSizeInMb(500);
+
+ private static final String TIER = "tier1";
+
+ @Test
+ public void testMaxSegmentsToMove1Thread()
+ {
+ Assert.assertEquals(0, computeMaxSegmentsToMove(0, 1));
+ Assert.assertEquals(50, computeMaxSegmentsToMove(50, 1));
+ Assert.assertEquals(100, computeMaxSegmentsToMove(100, 1));
+
+ Assert.assertEquals(100, computeMaxSegmentsToMove(512, 1));
+ Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 1));
+ Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 1));
+
+ Assert.assertEquals(1_900, computeMaxSegmentsToMove(10_000, 1));
+ Assert.assertEquals(9_700, computeMaxSegmentsToMove(50_000, 1));
+ Assert.assertEquals(19_500, computeMaxSegmentsToMove(100_000, 1));
+
+ Assert.assertEquals(10_000, computeMaxSegmentsToMove(200_000, 1));
+ Assert.assertEquals(4_000, computeMaxSegmentsToMove(500_000, 1));
+ Assert.assertEquals(2_000, computeMaxSegmentsToMove(1_000_000, 1));
+ }
+
+ @Test
+ public void testMaxSegmentsToMoveIncreasesWithCoordinatorPeriod()
+ {
+ Assert.assertEquals(5_000, computeMaxSegmentsToMoveInPeriod(200_000,
Duration.millis(30_000)));
+ Assert.assertEquals(10_000, computeMaxSegmentsToMoveInPeriod(200_000,
Duration.millis(60_000)));
+ Assert.assertEquals(15_000, computeMaxSegmentsToMoveInPeriod(200_000,
Duration.millis(90_000)));
+ Assert.assertEquals(20_000, computeMaxSegmentsToMoveInPeriod(200_000,
Duration.millis(120_000)));
+
+ Assert.assertEquals(2_000, computeMaxSegmentsToMoveInPeriod(500_000,
Duration.millis(30_000)));
+ Assert.assertEquals(4_000, computeMaxSegmentsToMoveInPeriod(500_000,
Duration.millis(60_000)));
+ Assert.assertEquals(6_000, computeMaxSegmentsToMoveInPeriod(500_000,
Duration.millis(90_000)));
+ Assert.assertEquals(8_000, computeMaxSegmentsToMoveInPeriod(500_000,
Duration.millis(120_000)));
+ }
+
+ @Test
+ public void testMaxSegmentsToMove8Threads()
+ {
+ Assert.assertEquals(0, computeMaxSegmentsToMove(0, 8));
+ Assert.assertEquals(50, computeMaxSegmentsToMove(50, 8));
+ Assert.assertEquals(100, computeMaxSegmentsToMove(100, 8));
+
+ Assert.assertEquals(100, computeMaxSegmentsToMove(512, 8));
+ Assert.assertEquals(200, computeMaxSegmentsToMove(1_024, 8));
+ Assert.assertEquals(300, computeMaxSegmentsToMove(1_536, 8));
+
+ Assert.assertEquals(33_000, computeMaxSegmentsToMove(500_000, 8));
+ Assert.assertEquals(16_000, computeMaxSegmentsToMove(1_000_000, 8));
+ Assert.assertEquals(8_000, computeMaxSegmentsToMove(2_000_000, 8));
+ Assert.assertEquals(3_000, computeMaxSegmentsToMove(5_000_000, 8));
+ Assert.assertEquals(1_000, computeMaxSegmentsToMove(10_000_000, 8));
+ }
+
+ @Test
+ public void testMaxSegmentsToMoveWithComputedNumThreads()
+ {
+ Assert.assertEquals(1_900, computeNumThreadsAndMaxToMove(10_000));
+ Assert.assertEquals(9_700, computeNumThreadsAndMaxToMove(50_000));
+
+ Assert.assertEquals(19_500, computeNumThreadsAndMaxToMove(100_000));
+ Assert.assertEquals(39_000, computeNumThreadsAndMaxToMove(200_000));
+ Assert.assertEquals(29_000, computeNumThreadsAndMaxToMove(500_000));
+
+ Assert.assertEquals(16_000, computeNumThreadsAndMaxToMove(1_000_000));
+ Assert.assertEquals(8_000, computeNumThreadsAndMaxToMove(2_000_000));
+ Assert.assertEquals(1_000, computeNumThreadsAndMaxToMove(10_000_000));
+ }
+
+ @Test
+ public void testMinSegmentsToMove()
+ {
+ Assert.assertEquals(0, computeMinSegmentsToMove(0));
+ Assert.assertEquals(50, computeMinSegmentsToMove(50));
+
+ Assert.assertEquals(100, computeMinSegmentsToMove(100));
+ Assert.assertEquals(100, computeMinSegmentsToMove(1_000));
+
+ Assert.assertEquals(100, computeMinSegmentsToMove(20_000));
+ Assert.assertEquals(100, computeMinSegmentsToMove(50_000));
+ Assert.assertEquals(100, computeMinSegmentsToMove(100_000));
+ Assert.assertEquals(300, computeMinSegmentsToMove(200_000));
+ Assert.assertEquals(700, computeMinSegmentsToMove(500_000));
+ Assert.assertEquals(1_500, computeMinSegmentsToMove(1_000_000));
+ Assert.assertEquals(15_200, computeMinSegmentsToMove(10_000_000));
+ }
+
+ @Test
+ public void testMinSegmentsToMoveIncreasesInSteps()
+ {
+ Assert.assertEquals(100, computeMinSegmentsToMove(131_071));
+ Assert.assertEquals(200, computeMinSegmentsToMove(131_072));
+
+ Assert.assertEquals(500, computeMinSegmentsToMove(393_215));
+ Assert.assertEquals(600, computeMinSegmentsToMove(393_216));
+
+ Assert.assertEquals(900, computeMinSegmentsToMove(655_359));
+ Assert.assertEquals(1000, computeMinSegmentsToMove(655_360));
+
+ Assert.assertEquals(9_900, computeMinSegmentsToMove(6_553_599));
+ Assert.assertEquals(10_000, computeMinSegmentsToMove(6_553_600));
+ }
+
+ @Test
+ public void testMinSegmentsArePickedForMoveWhenNoSkew()
+ {
+ final List<ServerHolder> historicals = Arrays.asList(
+ createServer("A", WIKI_SEGMENTS),
+ createServer("B", WIKI_SEGMENTS)
+ );
+
+ final int minSegmentsToMove =
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(20_000);
+ Assert.assertEquals(100, minSegmentsToMove);
+
+ final int segmentsToMoveToFixSkew = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(TIER, historicals);
+ Assert.assertEquals(0, segmentsToMoveToFixSkew);
+
+ // Find segmentsToMove with no limit on maxSegmentsToMove
+ final int segmentsToMove = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE);
+
+ Assert.assertEquals(minSegmentsToMove, segmentsToMove);
+ }
+
+ @Test
+ public void testHalfSegmentsArePickedForMoveWhenFullSkew()
+ {
+ final List<ServerHolder> historicals = Arrays.asList(
+ createServer("A", WIKI_SEGMENTS),
+ createServer("B", Collections.emptyList())
+ );
+
+ final int minSegmentsToMove =
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(10_000);
+ Assert.assertEquals(100, minSegmentsToMove);
+
+ final int segmentsToMoveToFixSkew = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveToBalanceTier(TIER, historicals);
+ Assert.assertEquals(5_000, segmentsToMoveToFixSkew);
+
+ // Find segmentsToMove with no limit on maxSegmentsToMove
+ final int segmentsToMove = SegmentToMoveCalculator
+ .computeNumSegmentsToMoveInTier(TIER, historicals, Integer.MAX_VALUE);
+
+ Assert.assertEquals(segmentsToMoveToFixSkew, segmentsToMove);
+ }
+
+ @Test
+ public void testDatasourceWithLargestGapDeterminesNumToBalanceCounts()
+ {
+ // Both servers have all koala segments but only A has wiki segments
+ List<DataSegment> segmentsForServerA = new ArrayList<>(WIKI_SEGMENTS);
+ segmentsForServerA.addAll(KOALA_SEGMENTS);
+
+ final List<ServerHolder> historicals = Arrays.asList(
+ createServer("A", segmentsForServerA),
+ createServer("B", KOALA_SEGMENTS)
+ );
+
+ // Verify that half the wiki segments need to be moved for balance
+ int numToMoveToBalanceCount = SegmentToMoveCalculator
+ .computeSegmentsToMoveToBalanceCountsPerDatasource(TIER, historicals);
+ Assert.assertEquals(WIKI_SEGMENTS.size() / 2, numToMoveToBalanceCount);
+ }
+
+ private static int computeMaxSegmentsToMove(int totalSegments, int
numThreads)
+ {
+ return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
+ totalSegments,
+ numThreads,
+ DEFAULT_COORDINATOR_PERIOD
+ );
+ }
+
+ private static int computeMaxSegmentsToMoveInPeriod(int totalSegments,
Duration coordinatorPeriod)
+ {
+ return
SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(totalSegments, 1,
coordinatorPeriod);
+ }
+
+ private static int computeNumThreadsAndMaxToMove(int totalSegments)
+ {
+ return SegmentToMoveCalculator.computeMaxSegmentsToMovePerTier(
+ totalSegments,
+ SegmentLoadingConfig.computeNumBalancerThreads(totalSegments),
+ DEFAULT_COORDINATOR_PERIOD
+ );
+ }
+
+ private static int computeMinSegmentsToMove(int totalSegmentsInTier)
+ {
+ return
SegmentToMoveCalculator.computeMinSegmentsToMoveInTier(totalSegmentsInTier);
+ }
+
+ private static ServerHolder createServer(String name, List<DataSegment>
segments)
+ {
+ final DruidServer server
+ = new DruidServer(name, name, null, 10L << 30, ServerType.HISTORICAL,
"tier1", 1);
+ segments.forEach(server::addDataSegment);
+ return new ServerHolder(server.toImmutableDruidServer(), new
TestLoadQueuePeon());
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index c608f58d9a..aada7b3214 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
+import org.joda.time.Duration;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
@@ -378,7 +379,7 @@ public class BalanceSegmentsTest
private CoordinatorRunStats runBalancer(DruidCoordinatorRuntimeParams params)
{
- params = new BalanceSegments().run(params);
+ params = new BalanceSegments(Duration.standardMinutes(1)).run(params);
if (params == null) {
Assert.fail("BalanceSegments duty returned null params");
return new CoordinatorRunStats();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
new file mode 100644
index 0000000000..947ac45ca7
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfigTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentLoadingConfigTest
+{
+
+ @Test
+ public void testComputeNumBalancerThreads()
+ {
+ Assert.assertEquals(1, computeBalancerThreads(0));
+ Assert.assertEquals(1, computeBalancerThreads(30_000));
+ Assert.assertEquals(2, computeBalancerThreads(50_000));
+ Assert.assertEquals(3, computeBalancerThreads(100_000));
+
+ Assert.assertEquals(4, computeBalancerThreads(175_000));
+ Assert.assertEquals(5, computeBalancerThreads(250_000));
+ Assert.assertEquals(6, computeBalancerThreads(350_000));
+ Assert.assertEquals(7, computeBalancerThreads(450_000));
+ Assert.assertEquals(8, computeBalancerThreads(600_000));
+
+ Assert.assertEquals(8, computeBalancerThreads(1_000_000));
+ Assert.assertEquals(8, computeBalancerThreads(10_000_000));
+ }
+
+ private int computeBalancerThreads(int numUsedSegments)
+ {
+ return SegmentLoadingConfig.computeNumBalancerThreads(numUsedSegments);
+ }
+
+}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index 984fe217af..bd1af4a084 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -217,8 +217,8 @@ public abstract class CoordinatorSimulationBaseTest
implements
{
static final String ASSIGNED_COUNT = "segment/assigned/count";
static final String MOVED_COUNT = "segment/moved/count";
+ static final String MOVE_SKIPPED = "segment/moveSkipped/count";
static final String DROPPED_COUNT = "segment/dropped/count";
- static final String OVERSHADOWED_COUNT = "segment/overshadowed/count";
static final String DELETED_COUNT = "segment/deleted/count";
static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
static final String DROP_QUEUE_COUNT = "segment/dropQueue/count";
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index c5d5d94293..e40bf8ee16 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -74,7 +74,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class CoordinatorSimulationBuilder
{
- private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
.setInjectableValues(
new InjectableValues.Std().addValue(
@@ -463,8 +462,8 @@ public class CoordinatorSimulationBuilder
this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
.withCoordinatorStartDelay(new Duration(1L))
- .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
- .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+ .withCoordinatorPeriod(Duration.standardMinutes(1))
+ .withCoordinatorKillPeriod(Duration.millis(100))
.withLoadQueuePeonType("http")
.withCoordinatorKillIgnoreDurationToRetain(false)
.build();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
index 383b5fe3c6..141c6ba7a1 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -26,6 +26,8 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Coordinator simulation test to verify behaviour of segment balancing.
@@ -229,4 +231,39 @@ public class SegmentBalancingTest extends
CoordinatorSimulationBaseTest
Assert.assertTrue(getValue(Metric.MOVED_COUNT, null).intValue() > 0);
}
+ @Test(timeout = 60000L)
+ public void testMaxSegmentsAreMovedWhenClusterIsSkewed()
+ {
+ // Add 10 historicals of size 1 TB each
+ final long size1TB = 1_000_000;
+ List<DruidServer> historicals
+ = IntStream.range(0, 10)
+ .mapToObj(i -> createHistorical(i + 1, Tier.T1, size1TB))
+ .collect(Collectors.toList());
+
+ CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(Segments.KOALA_100X100D)
+ .withServers(historicals)
+ .withRules(DS.KOALA, Load.on(Tier.T1,
1).forever())
+ .build();
+
+ startSimulation(sim);
+
+ // Run 1: All segments are assigned to the 10 historicals
+ runCoordinatorCycle();
+ verifyValue(Metric.ASSIGNED_COUNT, 10_000L);
+ verifyNotEmitted(Metric.MOVED_COUNT);
+ verifyValue(Metric.MOVE_SKIPPED, 100L);
+
+ // Run 2: Add 10 more historicals, some segments are moved to them
+ for (int i = 11; i <= 20; ++i) {
+ addServer(createHistorical(i, Tier.T1, size1TB));
+ }
+
+ runCoordinatorCycle();
+ verifyValue(Metric.MOVED_COUNT, 500L);
+ verifyNotEmitted(Metric.MOVE_SKIPPED);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]