This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4bd6bd0d4f Improve CostBalancerStrategy, deprecate cachingCost (#14484)
4bd6bd0d4f is described below
commit 4bd6bd0d4f61effa392cb5736d392ca12f7f3b40
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Jun 27 13:23:29 2023 +0530
Improve CostBalancerStrategy, deprecate cachingCost (#14484)
Changes to `cost` strategy:
- In every `ServerHolder`, track the number of segments per datasource per
interval
- Perform cost computations for a given interval just once, and then
multiply by a constant
factor to account for the total segment count in that interval
- Do not perform joint cost computations with segments that are outside the
compute interval
(± 45 days) for the segment being considered for move
- Remove metrics `segment/cost/*` as they were coordinator killers! Turning
on these metrics
(by setting `emitBalancingStats` to true) has often caused the coordinator
to be stuck for hours.
Moreover, they are too complicated to decipher and do not provide any
meaningful insight into
a Druid cluster.
- Add new simpler metrics `segment/balancer/compute/*` to track cost
computation time,
count and errors.
Other changes:
- Remove flaky test from `CostBalancerStrategyTest`.
- Add tests to verify that computed cost has remained unchanged
- Remove usages of mock `BalancerStrategy` from `LoadRuleTest`,
`BalanceSegmentsTest`
- Clean up `BalancerStrategy` interface
---
.../druid/server/coordinator/DruidCoordinator.java | 24 +-
.../coordinator/SegmentCountsPerInterval.java | 67 ++++++
.../druid/server/coordinator/ServerHolder.java | 74 +++---
.../coordinator/balancer/BalancerStrategy.java | 56 ++---
.../balancer/CachingCostBalancerStrategy.java | 9 +-
.../CachingCostBalancerStrategyFactory.java | 4 +
.../coordinator/balancer/CostBalancerStrategy.java | 266 +++++++++++----------
.../DiskNormalizedCostBalancerStrategy.java | 7 +-
.../balancer/RandomBalancerStrategy.java | 14 +-
.../coordinator/balancer/TierSegmentBalancer.java | 19 +-
.../server/coordinator/duty/BalanceSegments.java | 6 +
.../duty/CollectSegmentAndServerStats.java | 26 +-
.../server/coordinator/duty/CompactSegments.java | 3 +-
.../duty/MarkOvershadowedSegmentsAsUnused.java | 5 +-
.../druid/server/coordinator/duty/RunRules.java | 2 +-
.../coordinator/loading/HttpLoadQueuePeon.java | 6 +-
.../coordinator/loading/SegmentLoadingConfig.java | 2 +-
.../loading/StrategicSegmentAssigner.java | 26 +-
.../coordinator/stats/CoordinatorRunStats.java | 21 +-
.../druid/server/coordinator/stats/RowKey.java | 23 +-
.../druid/server/coordinator/stats/Stats.java | 12 +-
.../coordinator/CoordinatorRunStatsTest.java | 16 +-
.../balancer/CachingCostBalancerStrategyTest.java | 3 +-
.../balancer/CostBalancerStrategyTest.java | 137 ++++++-----
.../coordinator/duty/BalanceSegmentsTest.java | 81 ++-----
.../coordinator/duty/CompactSegmentsTest.java | 2 +-
.../server/coordinator/duty/RunRulesTest.java | 13 +-
.../server/coordinator/rules/LoadRuleTest.java | 158 +++---------
28 files changed, 515 insertions(+), 567 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 0c5e79bdcb..c9d334968e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -20,6 +20,7 @@
package org.apache.druid.server.coordinator;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
@@ -98,6 +99,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -672,7 +674,7 @@ public class DruidCoordinator
{
try {
log.info("Starting coordinator run for group [%s]", dutyGroupName);
- final long globalStart = System.currentTimeMillis();
+ final Stopwatch groupRunTime = Stopwatch.createStarted();
synchronized (lock) {
if (!coordLeaderSelector.isLeader()) {
@@ -719,23 +721,25 @@ public class DruidCoordinator
log.info("Coordination has been paused. Duties will not run until
coordination is resumed.");
}
+ final Stopwatch dutyRunTime = Stopwatch.createUnstarted();
for (CoordinatorDuty duty : duties) {
// Don't read state and run state in the same duty otherwise racy
conditions may exist
if (!coordinationPaused
&& coordLeaderSelector.isLeader()
&& startingLeaderCounter == coordLeaderSelector.localTerm()) {
- final long start = System.currentTimeMillis();
+ dutyRunTime.reset().start();
params = duty.run(params);
- final long end = System.currentTimeMillis();
+ dutyRunTime.stop();
final String dutyName = duty.getClass().getName();
if (params == null) {
log.info("Stopping run for group [%s] on request of duty [%s].",
dutyGroupName, dutyName);
return;
} else {
- final RowKey rowKey = RowKey.builder().add(Dimension.DUTY,
dutyName).build();
-
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey,
end - start);
+ final RowKey rowKey = RowKey.of(Dimension.DUTY, dutyName);
+ final long dutyRunMillis =
dutyRunTime.elapsed(TimeUnit.MILLISECONDS);
+
params.getCoordinatorStats().add(Stats.CoordinatorRun.DUTY_RUN_TIME, rowKey,
dutyRunMillis);
}
}
}
@@ -745,9 +749,9 @@ public class DruidCoordinator
if (allStats.rowCount() > 0) {
final AtomicInteger emittedCount = new AtomicInteger();
allStats.forEachStat(
- (dimensionValues, stat, value) -> {
+ (stat, dimensions, value) -> {
if (stat.shouldEmit()) {
- emitStat(stat, dimensionValues, value);
+ emitStat(stat, dimensions.getValues(), value);
emittedCount.incrementAndGet();
}
}
@@ -760,7 +764,7 @@ public class DruidCoordinator
}
// Emit the runtime of the full DutiesRunnable
- final long runMillis = System.currentTimeMillis() - globalStart;
+ final long runMillis =
groupRunTime.stop().elapsed(TimeUnit.MILLISECONDS);
emitStat(Stats.CoordinatorRun.GROUP_RUN_TIME, Collections.emptyMap(),
runMillis);
log.info("Finished coordinator run for group [%s] in [%d] ms",
dutyGroupName, runMillis);
}
@@ -771,10 +775,6 @@ public class DruidCoordinator
private void emitStat(CoordinatorStat stat, Map<Dimension, String>
dimensionValues, long value)
{
- if (stat.equals(Stats.Balancer.NORMALIZED_COST_X_1000)) {
- value = value / 1000;
- }
-
ServiceMetricEvent.Builder eventBuilder = new
ServiceMetricEvent.Builder()
.setDimension(Dimension.DUTY_GROUP.reportedName(), dutyGroupName);
dimensionValues.forEach(
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
new file mode 100644
index 0000000000..ea1f81ee6d
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/coordinator/SegmentCountsPerInterval.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMaps;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Maintains a count of segments for each datasource and interval.
+ */
+public class SegmentCountsPerInterval
+{
+ private final Map<String, Object2IntMap<Interval>>
+ datasourceIntervalToSegmentCount = new HashMap<>();
+ private final Object2IntMap<Interval> intervalToTotalSegmentCount = new
Object2IntOpenHashMap<>();
+
+ public void addSegment(DataSegment segment)
+ {
+ updateCountInInterval(segment, 1);
+ }
+
+ public void removeSegment(DataSegment segment)
+ {
+ updateCountInInterval(segment, -1);
+ }
+
+ public Object2IntMap<Interval> getIntervalToSegmentCount(String datasource)
+ {
+ return datasourceIntervalToSegmentCount.getOrDefault(datasource,
Object2IntMaps.emptyMap());
+ }
+
+ public Object2IntMap<Interval> getIntervalToTotalSegmentCount()
+ {
+ return intervalToTotalSegmentCount;
+ }
+
+ private void updateCountInInterval(DataSegment segment, int delta)
+ {
+ intervalToTotalSegmentCount.mergeInt(segment.getInterval(), delta,
Integer::sum);
+ datasourceIntervalToSegmentCount
+ .computeIfAbsent(segment.getDataSource(), ds -> new
Object2IntOpenHashMap<>())
+ .mergeInt(segment.getInterval(), delta, Integer::sum);
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
index 79f2c38ab2..7947f1c1b3 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java
@@ -32,11 +32,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -75,11 +73,7 @@ public class ServerHolder implements Comparable<ServerHolder>
*/
private final Map<DataSegment, SegmentAction> queuedSegments = new
HashMap<>();
- /**
- * Segments that are expected to be loaded on this server once all the
- * operations in progress have completed.
- */
- private final Set<DataSegment> projectedSegments = new HashSet<>();
+ private final SegmentCountsPerInterval projectedSegments = new
SegmentCountsPerInterval();
public ServerHolder(ImmutableDruidServer server, LoadQueuePeon peon)
{
@@ -133,31 +127,31 @@ public class ServerHolder implements
Comparable<ServerHolder>
AtomicInteger loadingReplicaCount
)
{
- projectedSegments.addAll(server.iterateAllSegments());
+ for (DataSegment segment : server.iterateAllSegments()) {
+ projectedSegments.addSegment(segment);
+ }
final List<SegmentHolder> expiredSegments = new ArrayList<>();
- peon.getSegmentsInQueue().forEach(
- (holder) -> {
- int runsInQueue = holder.incrementAndGetRunsInQueue();
- if (runsInQueue > maxLifetimeInQueue) {
- expiredSegments.add(holder);
- }
-
- final SegmentAction action = holder.getAction();
- addToQueuedSegments(holder.getSegment(), simplify(action));
-
- if (action == SegmentAction.MOVE_TO) {
- movingSegmentCount.incrementAndGet();
- }
- if (action == SegmentAction.REPLICATE) {
- loadingReplicaCount.incrementAndGet();
- }
- }
- );
-
- peon.getSegmentsMarkedToDrop().forEach(
- segment -> addToQueuedSegments(segment, SegmentAction.MOVE_FROM)
- );
+ for (SegmentHolder holder : peon.getSegmentsInQueue()) {
+ int runsInQueue = holder.incrementAndGetRunsInQueue();
+ if (runsInQueue > maxLifetimeInQueue) {
+ expiredSegments.add(holder);
+ }
+
+ final SegmentAction action = holder.getAction();
+ addToQueuedSegments(holder.getSegment(), simplify(action));
+
+ if (action == SegmentAction.MOVE_TO) {
+ movingSegmentCount.incrementAndGet();
+ }
+ if (action == SegmentAction.REPLICATE) {
+ loadingReplicaCount.incrementAndGet();
+ }
+ }
+
+ for (DataSegment segment : peon.getSegmentsMarkedToDrop()) {
+ addToQueuedSegments(segment, SegmentAction.MOVE_FROM);
+ }
if (!expiredSegments.isEmpty()) {
List<SegmentHolder> expiredSegmentsSubList =
@@ -251,11 +245,21 @@ public class ServerHolder implements
Comparable<ServerHolder>
* Segments that are expected to be loaded on this server once all the
* operations in progress have completed.
*/
- public Set<DataSegment> getProjectedSegments()
+ public SegmentCountsPerInterval getProjectedSegments()
{
return projectedSegments;
}
+ public boolean isProjectedSegment(DataSegment segment)
+ {
+ SegmentAction action = getActionOnSegment(segment);
+ if (action == null) {
+ return hasSegmentLoaded(segment.getId());
+ } else {
+ return action.isLoad();
+ }
+ }
+
/**
* Segments that are currently in the queue for being loaded on this server.
* This does not include segments that are being moved to this server.
@@ -362,10 +366,10 @@ public class ServerHolder implements
Comparable<ServerHolder>
// Add to projected if load is started, remove from projected if drop has
started
if (action.isLoad()) {
- projectedSegments.add(segment);
+ projectedSegments.addSegment(segment);
sizeOfLoadingSegments += segment.getSize();
} else {
- projectedSegments.remove(segment);
+ projectedSegments.removeSegment(segment);
if (action == SegmentAction.DROP) {
sizeOfDroppingSegments += segment.getSize();
}
@@ -379,10 +383,10 @@ public class ServerHolder implements
Comparable<ServerHolder>
queuedSegments.remove(segment);
if (action.isLoad()) {
- projectedSegments.remove(segment);
+ projectedSegments.removeSegment(segment);
sizeOfLoadingSegments -= segment.getSize();
} else {
- projectedSegments.add(segment);
+ projectedSegments.addSegment(segment);
if (action == SegmentAction.DROP) {
sizeOfDroppingSegments -= segment.getSize();
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
index 7f27648d3f..e133430b1b 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/BalancerStrategy.java
@@ -20,70 +20,60 @@
package org.apache.druid.server.coordinator.balancer;
import org.apache.druid.server.coordinator.ServerHolder;
-import org.apache.druid.server.coordinator.duty.BalanceSegments;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
/**
- * This interface describes the coordinator balancing strategy, which is
responsible for making decisions on where
- * to place {@link DataSegment}s on historical servers (described by {@link
ServerHolder}). The balancing strategy
- * is used by {@link org.apache.druid.server.coordinator.rules.LoadRule} to
assign and drop segments, and by
- * {@link BalanceSegments} to migrate segments between historicals.
+ * Segment balancing strategy, used in every coordinator run by
+ * {@link org.apache.druid.server.coordinator.loading.StrategicSegmentAssigner}
+ * to choose optimal servers to load, move or drop a segment.
*/
public interface BalancerStrategy
{
/**
- * Finds the best server to move a segment to according to the balancing
strategy.
+ * Finds the best server from the list of {@code destinationServers} to load
+ * the {@code segmentToMove}, if it is moved from the {@code sourceServer}.
+ * <p>
+ * In order to avoid unnecessary moves when the segment is already optimally
placed,
+ * include the {@code sourceServer} in the list of {@code
destinationServers}.
*
- * @param proposalSegment segment to move
- * @param sourceServer Server the segment is currently placed on.
- * @param destinationServers servers to consider as move destinations
- * @return The server to move to, or null if no move should be made or no
server is suitable
+ * @return The server to move to, or null if the segment is already
optimally placed.
*/
@Nullable
ServerHolder findDestinationServerToMoveSegment(
- DataSegment proposalSegment,
+ DataSegment segmentToMove,
ServerHolder sourceServer,
List<ServerHolder> destinationServers
);
/**
- * Finds the best servers on which to place the {@code proposalSegment}.
- * This method can be used both for placing the first copy of a segment
- * in the tier or a replica of the segment.
+ * Finds the best servers to load the given segment. This method can be used
+ * both for placing the first copy of a segment in a tier or a replica of an
+ * already available segment.
*
- * @param proposalSegment segment to place on servers
- * @param serverHolders servers to consider as segment homes
- * @return Iterator over the best servers (in order) on which the segment
- * can be placed.
+ * @return Iterator over the best servers (in order of preference) to load
+ * the segment.
*/
Iterator<ServerHolder> findServersToLoadSegment(
- DataSegment proposalSegment,
+ DataSegment segmentToLoad,
List<ServerHolder> serverHolders
);
/**
- * Returns an iterator for a set of servers to drop from, ordered by
preference of which server to drop from first
- * for a given drop strategy. One or more segments may be dropped, depending
on how much the segment is
- * over-replicated.
- * @param toDropSegment segment to drop from one or more servers
- * @param serverHolders set of historicals to consider dropping from
- * @return Iterator for set of historicals, ordered by drop preference
+ * Finds the best servers to drop the given segment.
+ *
+ * @return Iterator over the servers (in order of preference) to drop the
segment
*/
- Iterator<ServerHolder> pickServersToDropSegment(DataSegment toDropSegment,
NavigableSet<ServerHolder> serverHolders);
+ Iterator<ServerHolder> findServersToDropSegment(DataSegment segmentToDrop,
List<ServerHolder> serverHolders);
/**
- * Add balancing strategy stats during the 'balanceTier' operation of
- * {@link BalanceSegments} to be included
- * @param tier historical tier being balanced
- * @param stats stats object to add balancing strategy stats to
- * @param serverHolderList servers in tier being balanced
+ * Returns the stats collected by the strategy in the current run and resets
+ * the stats collector for the next run.
*/
- void emitStats(String tier, CoordinatorRunStats stats, List<ServerHolder>
serverHolderList);
+ CoordinatorRunStats getAndResetStats();
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
index 424657991c..eda9928915 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategy.java
@@ -28,10 +28,8 @@ import org.apache.druid.timeline.DataSegment;
import java.util.Collections;
import java.util.Set;
-
public class CachingCostBalancerStrategy extends CostBalancerStrategy
{
-
private final ClusterCostCache clusterCostCache;
public CachingCostBalancerStrategy(ClusterCostCache clusterCostCache,
ListeningExecutorService exec)
@@ -41,13 +39,8 @@ public class CachingCostBalancerStrategy extends
CostBalancerStrategy
}
@Override
- protected double computeCost(DataSegment proposalSegment, ServerHolder
server, boolean includeCurrentServer)
+ protected double computePlacementCost(DataSegment proposalSegment,
ServerHolder server)
{
- // (optional) Don't include server if it cannot load the segment
- if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) {
- return Double.POSITIVE_INFINITY;
- }
-
final String serverName = server.getServer().getName();
double cost = clusterCostCache.computeCost(serverName, proposalSegment);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
index 726dffe690..0ddacaead7 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java
@@ -125,6 +125,10 @@ public class CachingCostBalancerStrategyFactory implements
BalancerStrategyFacto
@Override
public BalancerStrategy createBalancerStrategy(final
ListeningExecutorService exec)
{
+ LOG.warn(
+ "'cachingCost' balancer strategy has been deprecated as it can lead to"
+ + " unbalanced clusters. Use 'cost' strategy instead."
+ );
if (!isInitialized() && config.isAwaitInitialization()) {
try {
final long startMillis = System.currentTimeMillis();
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
index 6d165c1f34..aae907bb59 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategy.java
@@ -19,16 +19,21 @@
package org.apache.druid.server.coordinator.balancer;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.commons.math3.util.FastMath;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@@ -38,11 +43,10 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
import java.util.PriorityQueue;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
public class CostBalancerStrategy implements BalancerStrategy
{
@@ -63,6 +67,20 @@ public class CostBalancerStrategy implements BalancerStrategy
= Comparator.<Pair<Double, ServerHolder>, Double>comparing(pair ->
pair.lhs)
.thenComparing(pair -> pair.rhs);
+ private final CoordinatorRunStats stats = new CoordinatorRunStats();
+
+ public static double computeJointSegmentsCost(DataSegment segment,
Iterable<DataSegment> segmentSet)
+ {
+ final Interval costComputeInterval = getCostComputeInterval(segment);
+ double totalCost = 0;
+ for (DataSegment s : segmentSet) {
+ if (costComputeInterval.overlaps(s.getInterval())) {
+ totalCost += computeJointSegmentsCost(segment, s);
+ }
+ }
+ return totalCost;
+ }
+
/**
* This defines the unnormalized cost function between two segments.
*
@@ -83,15 +101,20 @@ public class CostBalancerStrategy implements
BalancerStrategy
final Interval intervalA = segmentA.getInterval();
final Interval intervalB = segmentB.getInterval();
+ // constant cost-multiplier for segments of the same datsource
+ final double multiplier =
segmentA.getDataSource().equals(segmentB.getDataSource())
+ ? 2.0 : 1.0;
+ return intervalCost(intervalA, intervalB) * multiplier;
+ }
+
+ public static double intervalCost(Interval intervalA, Interval intervalB)
+ {
final double t0 = intervalA.getStartMillis();
final double t1 = (intervalA.getEndMillis() - t0) / MILLIS_FACTOR;
final double start = (intervalB.getStartMillis() - t0) / MILLIS_FACTOR;
final double end = (intervalB.getEndMillis() - t0) / MILLIS_FACTOR;
- // constant cost-multiplier for segments of the same datsource
- final double multiplier =
segmentA.getDataSource().equals(segmentB.getDataSource()) ? 2.0 : 1.0;
-
- return INV_LAMBDA_SQUARE * intervalCost(t1, start, end) * multiplier;
+ return INV_LAMBDA_SQUARE * intervalCost(t1, start, end);
}
/**
@@ -199,161 +222,119 @@ public class CostBalancerStrategy implements
BalancerStrategy
@Override
public Iterator<ServerHolder> findServersToLoadSegment(
- DataSegment proposalSegment,
+ DataSegment segmentToLoad,
List<ServerHolder> serverHolders
)
{
- return getServersByPlacementCost(proposalSegment, serverHolders, false,
"findServersToLoadSegment");
+ return orderServersByPlacementCost(segmentToLoad, serverHolders,
SegmentAction.LOAD)
+ .stream()
+ .filter(server -> server.canLoadSegment(segmentToLoad))
+ .iterator();
}
-
@Override
public ServerHolder findDestinationServerToMoveSegment(
- DataSegment proposalSegment,
+ DataSegment segmentToMove,
ServerHolder sourceServer,
List<ServerHolder> serverHolders
)
{
- Iterator<ServerHolder> servers =
- getServersByPlacementCost(proposalSegment, serverHolders, true,
"findServerToMoveSegment");
- return servers.hasNext() ? servers.next() : null;
- }
-
- public static double computeJointSegmentsCost(DataSegment segment,
Iterable<DataSegment> segmentSet)
- {
- double totalCost = 0;
- for (DataSegment s : segmentSet) {
- totalCost += computeJointSegmentsCost(segment, s);
+ List<ServerHolder> servers =
+ orderServersByPlacementCost(segmentToMove, serverHolders,
SegmentAction.MOVE_TO);
+ if (servers.isEmpty()) {
+ return null;
}
- return totalCost;
+
+ ServerHolder candidateServer = servers.get(0);
+ return candidateServer.equals(sourceServer) ? null : candidateServer;
}
@Override
- public Iterator<ServerHolder> pickServersToDropSegment(
+ public Iterator<ServerHolder> findServersToDropSegment(
DataSegment segmentToDrop,
- NavigableSet<ServerHolder> serverHolders
+ List<ServerHolder> serverHolders
)
{
- List<ServerHolder> serversByCost = Lists.newArrayList(
- getServersByPlacementCost(segmentToDrop, serverHolders, true,
"pickServersToDropSegment")
- );
+ List<ServerHolder> serversByCost =
+ orderServersByPlacementCost(segmentToDrop, serverHolders,
SegmentAction.DROP);
// Prioritize drop from highest cost servers
return Lists.reverse(serversByCost).iterator();
}
- /**
- * Calculates the initial cost of the Druid segment configuration.
- *
- * @param serverHolders A list of ServerHolders for a particular tier.
- *
- * @return The initial cost of the Druid tier.
- */
- public double calculateInitialTotalCost(final List<ServerHolder>
serverHolders)
+ @Override
+ public CoordinatorRunStats getAndResetStats()
{
- double cost = 0;
- for (ServerHolder server : serverHolders) {
- // segments are dumped into an array because it's probably better than
iterating the iterateAllSegments() result
- // quadratically in a loop, which can generate garbage in the form of
Stream, Spliterator, Iterator, etc. objects
- // whose total memory volume exceeds the size of the DataSegment array.
- DataSegment[] segments =
server.getServer().iterateAllSegments().toArray(new DataSegment[0]);
- for (DataSegment s1 : segments) {
- for (DataSegment s2 : segments) {
- cost += computeJointSegmentsCost(s1, s2);
- }
- }
- }
- return cost;
+ return stats.getSnapshotAndReset();
}
/**
- * Calculates the cost normalization. This is such that the normalized cost
is lower bounded
- * by 1 (e.g. when each segment gets its own historical node).
- *
- * @param serverHolders A list of ServerHolders for a particular tier.
- *
- * @return The normalization value (the sum of the diagonal entries in the
- * pairwise cost matrix). This is the cost of a cluster if each
- * segment were to get its own historical node.
+ * Computes the cost of placing a segment on this server.
*/
- public double calculateNormalization(final List<ServerHolder> serverHolders)
- {
- double cost = 0;
- for (ServerHolder server : serverHolders) {
- for (DataSegment segment : server.getServedSegments()) {
- cost += computeJointSegmentsCost(segment, segment);
- }
- }
- return cost;
- }
-
- @Override
- public void emitStats(String tier, CoordinatorRunStats stats,
List<ServerHolder> serverHolderList)
- {
- final double initialTotalCost =
calculateInitialTotalCost(serverHolderList);
- final double normalization = calculateNormalization(serverHolderList);
- final double normalizedInitialCost = initialTotalCost / normalization;
-
- final RowKey rowKey = RowKey.forTier(tier);
- stats.add(Stats.Balancer.RAW_COST, rowKey, (long) initialTotalCost);
- stats.add(Stats.Balancer.NORMALIZATION_COST, rowKey, (long) normalization);
- stats.add(Stats.Balancer.NORMALIZED_COST_X_1000, rowKey, (long)
(normalizedInitialCost * 1000));
-
- log.info(
- "[%s]: Initial Total Cost: [%f], Normalization: [%f], Initial
Normalized Cost: [%f]",
- tier, initialTotalCost, normalization, normalizedInitialCost
- );
- }
-
- protected double computeCost(
- final DataSegment proposalSegment,
- final ServerHolder server,
- final boolean includeCurrentServer
- )
+ protected double computePlacementCost(DataSegment proposalSegment,
ServerHolder server)
{
- // (optional) Don't include server if it cannot load the segment
- if (!includeCurrentServer && !server.canLoadSegment(proposalSegment)) {
- return Double.POSITIVE_INFINITY;
- }
+ final Interval costComputeInterval =
getCostComputeInterval(proposalSegment);
- // The contribution to the total cost of a given server by proposing to
move the segment to that server is...
- double cost = 0d;
+ // Compute number of segments in each interval
+ final Object2IntOpenHashMap<Interval> intervalToSegmentCount = new
Object2IntOpenHashMap<>();
- // the sum of the costs of segments expected to be on the server (loaded +
loading - dropping)
- Set<DataSegment> projectedSegments = server.getProjectedSegments();
- cost += computeJointSegmentsCost(proposalSegment, projectedSegments);
+ final SegmentCountsPerInterval projectedSegments =
server.getProjectedSegments();
+
projectedSegments.getIntervalToTotalSegmentCount().object2IntEntrySet().forEach(entry
-> {
+ final Interval interval = entry.getKey();
+ if (costComputeInterval.overlaps(interval)) {
+ intervalToSegmentCount.addTo(interval, entry.getIntValue());
+ }
+ });
+
+ // Count the segments for the same datasource twice as they have twice the
cost
+ final String datasource = proposalSegment.getDataSource();
+
projectedSegments.getIntervalToSegmentCount(datasource).object2IntEntrySet().forEach(entry
-> {
+ final Interval interval = entry.getKey();
+ if (costComputeInterval.overlaps(interval)) {
+ intervalToSegmentCount.addTo(interval, entry.getIntValue());
+ }
+ });
- // minus the self cost of the segment
- if (projectedSegments.contains(proposalSegment)) {
- cost -= computeJointSegmentsCost(proposalSegment, proposalSegment);
+ // Compute joint cost for each interval
+ double cost = 0;
+ final Interval segmentInterval = proposalSegment.getInterval();
+ cost += intervalToSegmentCount.object2IntEntrySet().stream().mapToDouble(
+ entry -> intervalCost(segmentInterval, entry.getKey())
+ * entry.getIntValue()
+ ).sum();
+
+ // Minus the self cost of the segment
+ if (server.isProjectedSegment(proposalSegment)) {
+ cost -= intervalCost(segmentInterval, segmentInterval) * 2.0;
}
return cost;
}
/**
- * Returns an iterator over the servers, ordered by increasing cost for
- * placing the given segment on that server.
- *
- * @param includeCurrentServer true if the server already serving a replica
- * of this segment should be included in the
results
+ * Orders the servers by increasing cost for placing the given segment.
*/
- private Iterator<ServerHolder> getServersByPlacementCost(
- DataSegment proposalSegment,
- Iterable<ServerHolder> serverHolders,
- boolean includeCurrentServer,
- String action
+ private List<ServerHolder> orderServersByPlacementCost(
+ DataSegment segment,
+ List<ServerHolder> serverHolders,
+ SegmentAction action
)
{
+ final Stopwatch computeTime = Stopwatch.createStarted();
final List<ListenableFuture<Pair<Double, ServerHolder>>> futures = new
ArrayList<>();
for (ServerHolder server : serverHolders) {
futures.add(
exec.submit(
- () -> Pair.of(computeCost(proposalSegment, server,
includeCurrentServer), server)
+ () -> Pair.of(computePlacementCost(segment, server), server)
)
);
}
+ String tier = serverHolders.isEmpty() ? null :
serverHolders.get(0).getServer().getTier();
+ final RowKey metricKey = RowKey.with(Dimension.TIER, tier)
+ .with(Dimension.DATASOURCE,
segment.getDataSource())
+ .and(Dimension.DESCRIPTION, action.name());
+
final PriorityQueue<Pair<Double, ServerHolder>> costPrioritizedServers =
new PriorityQueue<>(CHEAPEST_SERVERS_FIRST);
try {
@@ -364,30 +345,57 @@ public class CostBalancerStrategy implements
BalancerStrategy
);
}
catch (Exception e) {
- alertOnFailure(e, action);
+ stats.add(Stats.Balancer.COMPUTATION_ERRORS, metricKey, 1);
+ handleFailure(e, segment, action);
}
- // Include current server only if specified
- return costPrioritizedServers.stream()
- .filter(pair -> includeCurrentServer ||
pair.rhs.canLoadSegment(proposalSegment))
- .map(pair -> pair.rhs).iterator();
+ // Report computation stats
+ computeTime.stop();
+ stats.add(Stats.Balancer.COMPUTATION_COUNT, metricKey, 1);
+ stats.add(Stats.Balancer.COMPUTATION_TIME, metricKey,
computeTime.elapsed(TimeUnit.MILLISECONDS));
+
+ return costPrioritizedServers.stream().map(pair -> pair.rhs)
+ .collect(Collectors.toList());
}
- private void alertOnFailure(Exception e, String action)
+ private void handleFailure(
+ Exception e,
+ DataSegment segment,
+ SegmentAction action
+ )
{
- // Do not alert if the executor has been shutdown
+ final String reason;
+ String suggestion = "";
if (exec.isShutdown()) {
- log.noStackTrace().info("Balancer executor was terminated. Failing
action [%s].", action);
- return;
+ reason = "Executor shutdown";
+ } else if (e instanceof TimeoutException) {
+ reason = "Timed out";
+ suggestion = " Try setting a higher value for 'balancerComputeThreads'.";
+ } else {
+ reason = e.getMessage();
}
- final boolean hasTimedOut = e instanceof TimeoutException;
- final String message = StringUtils.format(
- "Cost balancer strategy %s in action [%s].%s",
- hasTimedOut ? "timed out" : "failed", action,
- hasTimedOut ? " Try setting a higher value of
'balancerComputeThreads'." : ""
- );
- log.makeAlert(e, message).emit();
+ String msgFormat = "Cost strategy computations failed for action[%s] on
segment[%s] due to reason[%s].[%s]";
+ log.noStackTrace().warn(e, msgFormat, action, segment.getId(), reason,
suggestion);
+ }
+
+ /**
+ * The cost compute interval for a segment is {@code [start-45days,
end+45days)}.
+ * This is because the joint cost of any two segments that are 45 days apart
is
+ * negligible.
+ */
+ private static Interval getCostComputeInterval(DataSegment segment)
+ {
+ final Interval segmentInterval = segment.getInterval();
+ if (Intervals.isEternity(segmentInterval)) {
+ return segmentInterval;
+ } else {
+ final long maxGap = TimeUnit.DAYS.toMillis(45);
+ return Intervals.utc(
+ segmentInterval.getStartMillis() - maxGap,
+ segmentInterval.getEndMillis() + maxGap
+ );
+ }
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
index cee292930c..601e5b042e 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/DiskNormalizedCostBalancerStrategy.java
@@ -35,13 +35,12 @@ public class DiskNormalizedCostBalancerStrategy extends
CostBalancerStrategy
* This ensures that all the hosts will have the same % disk utilization.
*/
@Override
- protected double computeCost(
+ protected double computePlacementCost(
final DataSegment proposalSegment,
- final ServerHolder server,
- final boolean includeCurrentServer
+ final ServerHolder server
)
{
- double cost = super.computeCost(proposalSegment, server,
includeCurrentServer);
+ double cost = super.computePlacementCost(proposalSegment, server);
if (cost == Double.POSITIVE_INFINITY) {
return cost;
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
index 98b1d8bdc7..cccc2518e8 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
@@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.NavigableSet;
import java.util.stream.Collectors;
/**
@@ -39,16 +38,18 @@ import java.util.stream.Collectors;
*/
public class RandomBalancerStrategy implements BalancerStrategy
{
+ private static final CoordinatorRunStats EMPTY_STATS = new
CoordinatorRunStats();
+
@Override
public Iterator<ServerHolder> findServersToLoadSegment(
- DataSegment proposalSegment,
+ DataSegment segmentToLoad,
List<ServerHolder> serverHolders
)
{
// Filter out servers which cannot load this segment
final List<ServerHolder> usableServerHolders =
serverHolders.stream()
- .filter(server -> server.canLoadSegment(proposalSegment))
+ .filter(server -> server.canLoadSegment(segmentToLoad))
.collect(Collectors.toList());
Collections.shuffle(usableServerHolders);
return usableServerHolders.iterator();
@@ -56,7 +57,7 @@ public class RandomBalancerStrategy implements
BalancerStrategy
@Override
public ServerHolder findDestinationServerToMoveSegment(
- DataSegment proposalSegment,
+ DataSegment segmentToMove,
ServerHolder sourceServer,
List<ServerHolder> serverHolders
)
@@ -66,7 +67,7 @@ public class RandomBalancerStrategy implements
BalancerStrategy
}
@Override
- public Iterator<ServerHolder> pickServersToDropSegment(DataSegment
toDropSegment, NavigableSet<ServerHolder> serverHolders)
+ public Iterator<ServerHolder> findServersToDropSegment(DataSegment
segmentToDrop, List<ServerHolder> serverHolders)
{
List<ServerHolder> serverList = new ArrayList<>(serverHolders);
Collections.shuffle(serverList);
@@ -74,7 +75,8 @@ public class RandomBalancerStrategy implements
BalancerStrategy
}
@Override
- public void emitStats(String tier, CoordinatorRunStats stats,
List<ServerHolder> serverHolderList)
+ public CoordinatorRunStats getAndResetStats()
{
+ return EMPTY_STATS;
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
index a5ab8d0d47..86e5b99629 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.balancer;
-import com.google.common.collect.Lists;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
@@ -56,11 +55,9 @@ public class TierSegmentBalancer
private final DruidCoordinatorRuntimeParams params;
private final StrategicSegmentAssigner segmentAssigner;
- private final BalancerStrategy strategy;
private final SegmentLoadingConfig loadingConfig;
private final CoordinatorRunStats runStats;
- private final Set<ServerHolder> allServers;
private final List<ServerHolder> activeServers;
private final List<ServerHolder> decommissioningServers;
private final int totalMaxSegmentsToMove;
@@ -77,7 +74,6 @@ public class TierSegmentBalancer
this.params = params;
this.segmentAssigner = params.getSegmentAssigner();
- this.strategy = params.getBalancerStrategy();
this.loadingConfig = params.getSegmentLoadingConfig();
this.totalMaxSegmentsToMove = loadingConfig.getMaxSegmentsToMove();
this.runStats = segmentAssigner.getStats();
@@ -86,7 +82,6 @@ public class TierSegmentBalancer
servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
this.decommissioningServers = partitions.get(true);
this.activeServers = partitions.get(false);
- this.allServers = servers;
this.movingSegmentCount =
activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
}
@@ -128,10 +123,6 @@ public class TierSegmentBalancer
"Moved [%d] segments out of max [%d] between active servers in tier
[%s].",
movedGeneralSegments, maxGeneralSegmentsToMove, tier
);
-
- if (loadingConfig.isEmitBalancingStats()) {
- strategy.emitStats(tier, runStats, Lists.newArrayList(allServers));
- }
}
private int moveSegmentsFromTo(
@@ -224,13 +215,9 @@ public class TierSegmentBalancer
private void markUnmoved(String reason, DataSegment segment)
{
- final RowKey key
- = RowKey.builder()
- .add(Dimension.TIER, tier)
- .add(Dimension.DATASOURCE, segment.getDataSource())
- .add(Dimension.DESCRIPTION, reason)
- .build();
-
+ RowKey key = RowKey.with(Dimension.TIER, tier)
+ .with(Dimension.DATASOURCE, segment.getDataSource())
+ .and(Dimension.DESCRIPTION, reason);
runStats.add(Stats.Segments.MOVE_SKIPPED, key, 1);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index 865fde2a0b..4ff2e65743 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -24,6 +24,7 @@ import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.balancer.TierSegmentBalancer;
import org.apache.druid.server.coordinator.loading.SegmentLoadingConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
/**
*
@@ -57,6 +58,11 @@ public class BalanceSegments implements CoordinatorDuty
(tier, servers) -> new TierSegmentBalancer(tier, servers, params).run()
);
+ CoordinatorRunStats runStats = params.getCoordinatorStats();
+ params.getBalancerStrategy()
+ .getAndResetStats()
+ .forEachStat(runStats::add);
+
return params;
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
index b5825cb084..5109ec3c3f 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CollectSegmentAndServerStats.java
@@ -72,7 +72,7 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
final DruidCluster cluster = params.getDruidCluster();
cluster.getHistoricals().forEach((tier, historicals) -> {
- final RowKey rowKey = RowKey.forTier(tier);
+ final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
stats.add(Stats.Tier.HISTORICAL_COUNT, rowKey, historicals.size());
long totalCapacity =
historicals.stream().map(ServerHolder::getMaxSize).reduce(0L, Long::sum);
stats.add(Stats.Tier.TOTAL_CAPACITY, rowKey, totalCapacity);
@@ -80,20 +80,23 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
// Collect load queue stats
coordinator.getLoadManagementPeons().forEach((serverName, queuePeon) -> {
- final RowKey rowKey = RowKey.builder().add(Dimension.SERVER,
serverName).build();
+ final RowKey rowKey = RowKey.of(Dimension.SERVER, serverName);
stats.add(Stats.SegmentQueue.BYTES_TO_LOAD, rowKey,
queuePeon.getSizeOfSegmentsToLoad());
stats.add(Stats.SegmentQueue.NUM_TO_LOAD, rowKey,
queuePeon.getSegmentsToLoad().size());
stats.add(Stats.SegmentQueue.NUM_TO_DROP, rowKey,
queuePeon.getSegmentsToDrop().size());
queuePeon.getAndResetStats().forEachStat(
- (dimValues, stat, statValue) ->
- stats.add(stat, createRowKeyForServer(serverName, dimValues),
statValue)
+ (stat, key, statValue) ->
+ stats.add(stat, createRowKeyForServer(serverName,
key.getValues()), statValue)
);
});
coordinator.getDatasourceToUnavailableSegmentCount().forEach(
- (dataSource, numUnavailable) ->
- stats.addToDatasourceStat(Stats.Segments.UNAVAILABLE, dataSource,
numUnavailable)
+ (dataSource, numUnavailable) -> stats.add(
+ Stats.Segments.UNAVAILABLE,
+ RowKey.of(Dimension.DATASOURCE, dataSource),
+ numUnavailable
+ )
);
coordinator.getTierToDatasourceToUnderReplicatedCount(false).forEach(
@@ -108,17 +111,18 @@ public class CollectSegmentAndServerStats implements
CoordinatorDuty
(dataSource, timeline) -> {
long totalSizeOfUsedSegments = timeline.iterateAllObjects().stream()
.mapToLong(DataSegment::getSize).sum();
- stats.addToDatasourceStat(Stats.Segments.USED_BYTES, dataSource,
totalSizeOfUsedSegments);
- stats.addToDatasourceStat(Stats.Segments.USED, dataSource,
timeline.getNumObjects());
+
+ RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, dataSource);
+ stats.add(Stats.Segments.USED_BYTES, datasourceKey,
totalSizeOfUsedSegments);
+ stats.add(Stats.Segments.USED, datasourceKey,
timeline.getNumObjects());
}
);
}
private RowKey createRowKeyForServer(String serverName, Map<Dimension,
String> dimensionValues)
{
- final RowKey.Builder builder = RowKey.builder();
- dimensionValues.forEach(builder::add);
- builder.add(Dimension.SERVER, serverName);
+ final RowKey.Builder builder = RowKey.with(Dimension.SERVER, serverName);
+ dimensionValues.forEach(builder::with);
return builder.build();
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index d767c51a92..0381a70ba2 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -43,6 +43,7 @@ import
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
@@ -593,7 +594,7 @@ public class CompactSegments implements
CoordinatorCustomDuty
CoordinatorRunStats stats
)
{
- final RowKey rowKey = RowKey.forDatasource(dataSource);
+ final RowKey rowKey = RowKey.of(Dimension.DATASOURCE, dataSource);
stats.add(Stats.Compaction.PENDING_BYTES, rowKey,
autoCompactionSnapshot.getBytesAwaitingCompaction());
stats.add(Stats.Compaction.PENDING_SEGMENTS, rowKey,
autoCompactionSnapshot.getSegmentCountAwaitingCompaction());
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
index 1a63083bf7..3cb9f0064c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/MarkOvershadowedSegmentsAsUnused.java
@@ -28,6 +28,8 @@ import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -100,7 +102,8 @@ public class MarkOvershadowedSegmentsAsUnused implements
CoordinatorDuty
final CoordinatorRunStats stats = params.getCoordinatorStats();
datasourceToUnusedSegments.forEach(
(datasource, unusedSegments) -> {
- stats.addToDatasourceStat(Stats.Segments.OVERSHADOWED, datasource,
unusedSegments.size());
+ RowKey datasourceKey = RowKey.of(Dimension.DATASOURCE, datasource);
+ stats.add(Stats.Segments.OVERSHADOWED, datasourceKey,
unusedSegments.size());
coordinator.markSegmentsAsUnused(datasource, unusedSegments);
}
);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
index eb037d1212..a5c4eb5804 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java
@@ -67,7 +67,7 @@ public class RunRules implements CoordinatorDuty
final Set<DataSegment> overshadowed =
params.getDataSourcesSnapshot().getOvershadowedSegments();
final Set<DataSegment> usedSegments = params.getUsedSegments();
log.info(
- "Applying retention rules on [%d] used segments, skipping [%d]
overshadowed segments.",
+ "Applying retention rules on [%,d] used segments, skipping [%,d]
overshadowed segments.",
usedSegments.size(), overshadowed.size()
);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
index 9c133fce8c..92dd4714a4 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java
@@ -40,6 +40,8 @@ import
org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpHeaders;
@@ -543,7 +545,9 @@ public class HttpLoadQueuePeon implements LoadQueuePeon
private void incrementStat(SegmentHolder holder, RequestStatus status)
{
- stats.addToDatasourceStat(status.datasourceStat,
holder.getSegment().getDataSource(), 1);
+ RowKey rowKey = RowKey.with(Dimension.DATASOURCE,
holder.getSegment().getDataSource())
+ .and(Dimension.DESCRIPTION,
holder.getAction().name());
+ stats.add(status.datasourceStat, rowKey, 1);
}
private void executeCallbacks(SegmentHolder holder, boolean success)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index 559950c18e..fbf867fd91 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -61,7 +61,7 @@ public class SegmentLoadingConfig
log.info(
"Smart segment loading is enabled. Recomputed
replicationThrottleLimit"
- + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove [%d].",
+ + " [%,d] (%d%% of used segments [%,d]) and maxSegmentsToMove
[%,d].",
replicationThrottleLimit, throttlePercentage, numUsedSegments,
maxSegmentsToMove
);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 4fc2e176f7..6546cce69c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -33,6 +33,7 @@ import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -296,7 +297,7 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
private void reportTierCapacityStats(DataSegment segment, int
requiredReplicas, String tier)
{
- final RowKey rowKey = RowKey.forTier(tier);
+ final RowKey rowKey = RowKey.of(Dimension.TIER, tier);
stats.updateMax(Stats.Tier.REPLICATION_FACTOR, rowKey, requiredReplicas);
stats.add(Stats.Tier.REQUIRED_CAPACITY, rowKey, segment.getSize() *
requiredReplicas);
}
@@ -342,7 +343,8 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
public void deleteSegment(DataSegment segment)
{
loadQueueManager.deleteSegment(segment);
- stats.addToDatasourceStat(Stats.Segments.DELETED, segment.getDataSource(),
1);
+ RowKey rowKey = RowKey.of(Dimension.DATASOURCE, segment.getDataSource());
+ stats.add(Stats.Segments.DELETED, rowKey, 1);
}
/**
@@ -429,9 +431,9 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
if (numToDrop > numDropsQueued) {
remainingNumToDrop = numToDrop - numDropsQueued;
Iterator<ServerHolder> serverIterator =
- (useRoundRobinAssignment || eligibleLiveServers.size() >=
remainingNumToDrop)
+ (useRoundRobinAssignment || eligibleLiveServers.size() <=
remainingNumToDrop)
? eligibleLiveServers.iterator()
- : strategy.pickServersToDropSegment(segment, eligibleLiveServers);
+ : strategy.findServersToDropSegment(segment, new
ArrayList<>(eligibleLiveServers));
numDropsQueued += dropReplicasFromServers(remainingNumToDrop, segment,
serverIterator, tier);
}
@@ -493,7 +495,7 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
? serverSelector.getServersInTierToLoadSegment(tier, segment)
: strategy.findServersToLoadSegment(segment, eligibleServers);
if (!serverIterator.hasNext()) {
- incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No server chosen by
strategy", segment, tier);
+ incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "No strategic server",
segment, tier);
return 0;
}
@@ -586,16 +588,10 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
private void incrementSkipStat(CoordinatorStat stat, String reason,
DataSegment segment, String tier)
{
- final RowKey.Builder keyBuilder
- = RowKey.builder()
- .add(Dimension.TIER, tier)
- .add(Dimension.DATASOURCE, segment.getDataSource());
-
- if (reason != null) {
- keyBuilder.add(Dimension.DESCRIPTION, reason);
- }
-
- stats.add(stat, keyBuilder.build(), 1);
+ final RowKey key = RowKey.with(Dimension.TIER, tier)
+ .with(Dimension.DATASOURCE,
segment.getDataSource())
+ .and(Dimension.DESCRIPTION, reason);
+ stats.add(stat, key, 1);
}
private void incrementStat(CoordinatorStat stat, DataSegment segment, String
tier, long value)
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index 7d63c78c47..f8ea6cf98f 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -64,12 +64,7 @@ public class CoordinatorRunStats
public long getSegmentStat(CoordinatorStat stat, String tier, String
datasource)
{
- return get(stat, RowKey.builder().add(Dimension.DATASOURCE,
datasource).add(Dimension.TIER, tier).build());
- }
-
- public long getDataSourceStat(CoordinatorStat stat, String dataSource)
- {
- return get(stat, RowKey.forDatasource(dataSource));
+ return get(stat, RowKey.with(Dimension.DATASOURCE,
datasource).and(Dimension.TIER, tier));
}
public long get(CoordinatorStat stat)
@@ -87,7 +82,7 @@ public class CoordinatorRunStats
{
allStats.forEach(
(rowKey, stats) -> stats.object2LongEntrySet().fastForEach(
- stat -> handler.handle(rowKey.getValues(), stat.getKey(),
stat.getLongValue())
+ stat -> handler.handle(stat.getKey(), rowKey, stat.getLongValue())
)
);
}
@@ -199,16 +194,10 @@ public class CoordinatorRunStats
.addTo(stat, value);
}
- public void addToDatasourceStat(CoordinatorStat stat, String dataSource,
long value)
- {
- add(stat, RowKey.forDatasource(dataSource), value);
- }
-
public void addToSegmentStat(CoordinatorStat stat, String tier, String
datasource, long value)
{
- RowKey rowKey = RowKey.builder()
- .add(Dimension.TIER, tier)
- .add(Dimension.DATASOURCE, datasource).build();
+ RowKey rowKey = RowKey.with(Dimension.TIER, tier)
+ .and(Dimension.DATASOURCE, datasource);
add(stat, rowKey, value);
}
@@ -267,7 +256,7 @@ public class CoordinatorRunStats
public interface StatHandler
{
- void handle(Map<Dimension, String> dimensionValues, CoordinatorStat stat,
long statValue);
+ void handle(CoordinatorStat stat, RowKey rowKey, long statValue);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
index 1893f86a9c..b0ee0a2d1f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/RowKey.java
@@ -40,19 +40,16 @@ public class RowKey
this.hashCode = Objects.hash(values);
}
- public static Builder builder()
+ public static Builder with(Dimension dimension, String value)
{
- return new RowKey.Builder();
+ Builder builder = new Builder();
+ builder.with(dimension, value);
+ return builder;
}
- public static RowKey forTier(String tier)
+ public static RowKey of(Dimension dimension, String value)
{
- return RowKey.builder().add(Dimension.TIER, tier).build();
- }
-
- public static RowKey forDatasource(String datasource)
- {
- return RowKey.builder().add(Dimension.DATASOURCE, datasource).build();
+ return with(dimension, value).build();
}
public Map<Dimension, String> getValues()
@@ -83,12 +80,18 @@ public class RowKey
{
private final Map<Dimension, String> values = new
EnumMap<>(Dimension.class);
- public Builder add(Dimension dimension, String value)
+ public Builder with(Dimension dimension, String value)
{
values.put(dimension, value);
return this;
}
+ public RowKey and(Dimension dimension, String value)
+ {
+ values.put(dimension, value);
+ return new RowKey(values);
+ }
+
public RowKey build()
{
return new RowKey(values);
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
index 791d3963dd..9864aa6b3a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/stats/Stats.java
@@ -131,11 +131,11 @@ public class Stats
public static class Balancer
{
- public static final CoordinatorStat RAW_COST
- = new CoordinatorStat("initialCost", "segment/cost/raw");
- public static final CoordinatorStat NORMALIZATION_COST
- = new CoordinatorStat("normaliznCost", "segment/cost/normalization");
- public static final CoordinatorStat NORMALIZED_COST_X_1000
- = new CoordinatorStat("normalizedCost", "segment/cost/normalized");
+ public static final CoordinatorStat COMPUTATION_ERRORS
+ = new CoordinatorStat("costComputeError",
"segment/balancer/compute/error");
+ public static final CoordinatorStat COMPUTATION_TIME
+ = new CoordinatorStat("costComputeTime",
"segment/balancer/compute/time");
+ public static final CoordinatorStat COMPUTATION_COUNT
+ = new CoordinatorStat("costComputeCount",
"segment/balancer/compute/count");
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index 3354080069..c2c77f02cb 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -161,14 +161,14 @@ public class CoordinatorRunStatsTest
);
expected.forEach(
(duty, count) ->
- stats.add(STAT_1, RowKey.builder().add(Dimension.DUTY,
duty).build(), count)
+ stats.add(STAT_1, RowKey.of(Dimension.DUTY, duty), count)
);
final Map<String, Long> actual = new HashMap<>();
stats.forEachStat(
- (dimensionValues, stat, value) -> {
+ (stat, rowKey, value) -> {
if (stat.equals(STAT_1)) {
- actual.put(dimensionValues.get(Dimension.DUTY), value);
+ actual.put(rowKey.getValues().get(Dimension.DUTY), value);
}
}
);
@@ -192,7 +192,7 @@ public class CoordinatorRunStatsTest
debugStats.add(DEBUG_STAT_1, Key.TIER_1, 1);
Assert.assertTrue(debugStats.hasStat(DEBUG_STAT_1));
- debugStats.addToDatasourceStat(DEBUG_STAT_2, "wiki", 1);
+ debugStats.add(DEBUG_STAT_2, RowKey.of(Dimension.DATASOURCE, "wiki"), 1);
Assert.assertFalse(debugStats.hasStat(DEBUG_STAT_2));
}
@@ -201,11 +201,11 @@ public class CoordinatorRunStatsTest
*/
private static class Key
{
- static final RowKey TIER_1 = RowKey.forTier("tier1");
- static final RowKey TIER_2 = RowKey.forTier("tier2");
+ static final RowKey TIER_1 = RowKey.of(Dimension.TIER, "tier1");
+ static final RowKey TIER_2 = RowKey.of(Dimension.TIER, "tier2");
- static final RowKey DUTY_1 = RowKey.builder().add(Dimension.DUTY,
"duty1").build();
- static final RowKey DUTY_2 = RowKey.builder().add(Dimension.DUTY,
"duty2").build();
+ static final RowKey DUTY_1 = RowKey.of(Dimension.DUTY, "duty1");
+ static final RowKey DUTY_2 = RowKey.of(Dimension.DUTY, "duty2");
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
index f4cad1cb5c..23975352b3 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyTest.java
@@ -38,6 +38,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -102,7 +103,7 @@ public class CachingCostBalancerStrategyTest
.findDestinationServerToMoveSegment(s, firstServer,
serverHolderList);
ServerHolder s2 = costBalancerStrategy
.findDestinationServerToMoveSegment(s, firstServer,
serverHolderList);
- return
(s1.getServer().getName().equals(s2.getServer().getName())) ? 0 : 1;
+ return Objects.equals(s1, s2) ? 0 : 1;
}
)
.sum();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
index 6580425764..17d0a8716a 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/balancer/CostBalancerStrategyTest.java
@@ -21,17 +21,19 @@ package org.apache.druid.server.coordinator.balancer;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.GranularityType;
import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.java.util.emitter.core.Event;
-import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.loading.LoadQueuePeonTester;
-import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
import org.junit.Assert;
@@ -61,7 +63,7 @@ public class CostBalancerStrategyTest
@Before
public void setup()
{
- balancerExecutor = new BlockingExecutorService("test-balance-exec-%d");
+ balancerExecutor = Execs.singleThreaded("test-balance-exec-%d");
strategy = new
CostBalancerStrategy(MoreExecutors.listeningDecorator(balancerExecutor));
serviceEmitter = new StubServiceEmitter("test-service", "host");
@@ -241,7 +243,7 @@ public class CostBalancerStrategyTest
}
@Test
- public void testComputeCost()
+ public void testComputePlacementCost()
{
// Create segments for different granularities
final List<DataSegment> daySegments =
@@ -265,7 +267,7 @@ public class CostBalancerStrategyTest
.withNumPartitions(30)
.eachOfSizeInMb(100);
- // Distribute the segments randomly amongst 2 servers
+ // Distribute the segments randomly amongst 3 servers
final List<DataSegment> segments = new ArrayList<>(daySegments);
segments.addAll(monthSegments);
segments.addAll(yearSegments);
@@ -284,35 +286,66 @@ public class CostBalancerStrategyTest
server -> new ServerHolder(server.toImmutableDruidServer(), new
LoadQueuePeonTester())
).collect(Collectors.toList());
+ final ServerHolder serverA = serverHolders.get(0);
+ final ServerHolder serverB = serverHolders.get(1);
+ final ServerHolder serverC = serverHolders.get(2);
+
// Verify costs for DAY, MONTH and YEAR segments
- verifyServerCosts(
- daySegments.get(0),
- serverHolders,
- 5191.500804, 8691.392080, 6418.467818
- );
- verifyServerCosts(
- monthSegments.get(0),
- serverHolders,
- 301935.940609, 301935.940606, 304333.677669
- );
- verifyServerCosts(
- yearSegments.get(0),
- serverHolders,
- 8468764.380437, 12098919.896931, 14501440.169452
- );
+ final DataSegment daySegment = daySegments.get(0);
+ verifyPlacementCost(daySegment, serverA, 5191.500804);
+ verifyPlacementCost(daySegment, serverB, 8691.392080);
+ verifyPlacementCost(daySegment, serverC, 6418.467818);
+
+ final DataSegment monthSegment = monthSegments.get(0);
+ verifyPlacementCost(monthSegment, serverA, 301935.940609);
+ verifyPlacementCost(monthSegment, serverB, 301935.940606);
+ verifyPlacementCost(monthSegment, serverC, 304333.677669);
+
+ final DataSegment yearSegment = yearSegments.get(0);
+ verifyPlacementCost(yearSegment, serverA, 8468764.380437);
+ verifyPlacementCost(yearSegment, serverB, 12098919.896931);
+ verifyPlacementCost(yearSegment, serverC, 14501440.169452);
// Verify costs for an ALL granularity segment
- DataSegment allGranularitySegment =
+ final DataSegment allGranularitySegment =
CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.ALL)
.eachOfSizeInMb(100).get(0);
- verifyServerCosts(
- allGranularitySegment,
- serverHolders,
- 1.1534173737329768e7,
- 1.6340633534241956e7,
- 1.9026400521582970e7
+ verifyPlacementCost(allGranularitySegment, serverA, 1.1534173737329768e7);
+ verifyPlacementCost(allGranularitySegment, serverB, 1.6340633534241956e7);
+ verifyPlacementCost(allGranularitySegment, serverC, 1.9026400521582970e7);
+ }
+
+ @Test
+ public void testGetAndResetStats()
+ {
+ final ServerHolder serverA = new ServerHolder(
+ createHistorical().toImmutableDruidServer(),
+ new LoadQueuePeonTester()
+ );
+ final ServerHolder serverB = new ServerHolder(
+ createHistorical().toImmutableDruidServer(),
+ new LoadQueuePeonTester()
);
+
+ final DataSegment segment =
CreateDataSegments.ofDatasource(DS_WIKI).eachOfSizeInMb(100).get(0);
+
+ // Verify that computation stats have been tracked
+ strategy.findServersToLoadSegment(segment, Arrays.asList(serverA,
serverB));
+ CoordinatorRunStats computeStats = strategy.getAndResetStats();
+
+ final RowKey rowKey = RowKey.with(Dimension.DATASOURCE, DS_WIKI)
+ .with(Dimension.DESCRIPTION, "LOAD")
+ .and(Dimension.TIER, "hot");
+ Assert.assertEquals(1L, computeStats.get(Stats.Balancer.COMPUTATION_COUNT,
rowKey));
+
+ long computeTime = computeStats.get(Stats.Balancer.COMPUTATION_TIME,
rowKey);
+ Assert.assertTrue(computeTime >= 0 && computeTime <= 100);
+
Assert.assertFalse(computeStats.hasStat(Stats.Balancer.COMPUTATION_ERRORS));
+
+ // Verify that stats have been reset
+ computeStats = strategy.getAndResetStats();
+ Assert.assertEquals(0, computeStats.rowCount());
}
@Test
@@ -334,42 +367,24 @@ public class CostBalancerStrategyTest
);
}
- @Test(timeout = 90_000L)
- public void testFindServerRaisesAlertOnTimeout()
+ /**
+ * Verifies that the cost of placing the segment on the server is as
expected.
+ * Also verifies that this cost is equal to the total joint cost of this
segment
+ * with each segment currently on the server.
+ */
+ private void verifyPlacementCost(DataSegment segment, ServerHolder server,
double expectedCost)
{
- DataSegment segment = CreateDataSegments.ofDatasource(DS_WIKI)
- .forIntervals(1, Granularities.DAY)
- .startingAt("2012-10-24")
- .eachOfSizeInMb(100).get(0);
-
- final LoadQueuePeonTester peon = new LoadQueuePeonTester();
- ServerHolder serverA = new
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
- ServerHolder serverB = new
ServerHolder(createHistorical().toImmutableDruidServer(), peon);
-
- strategy.findServersToLoadSegment(segment, Arrays.asList(serverA,
serverB));
-
- List<Event> events = serviceEmitter.getEvents();
- Assert.assertEquals(1, events.size());
- Assert.assertTrue(events.get(0) instanceof AlertEvent);
-
- AlertEvent alertEvent = (AlertEvent) events.get(0);
- Assert.assertEquals(
- "Cost balancer strategy timed out in action
[findServersToLoadSegment]."
- + " Try setting a higher value of 'balancerComputeThreads'.",
- alertEvent.getDescription()
- );
- }
+ double observedCost = strategy.computePlacementCost(segment, server);
+ Assert.assertEquals(expectedCost, observedCost, DELTA);
- private void verifyServerCosts(
- DataSegment segment,
- List<ServerHolder> serverHolders,
- double... expectedCosts
- )
- {
- for (int i = 0; i < serverHolders.size(); ++i) {
- double observedCost = strategy.computeCost(segment,
serverHolders.get(i), true);
- Assert.assertEquals(expectedCosts[i], observedCost, DELTA);
+ double totalJointSegmentCost = 0;
+ for (DataSegment segmentOnServer :
server.getServer().iterateAllSegments()) {
+ totalJointSegmentCost +=
CostBalancerStrategy.computeJointSegmentsCost(segment, segmentOnServer);
+ }
+ if (server.isServingSegment(segment)) {
+ totalJointSegmentCost -=
CostBalancerStrategy.computeJointSegmentsCost(segment, segment);
}
+ Assert.assertEquals(totalJointSegmentCost, observedCost, DELTA);
}
private void verifyJointSegmentsCost(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
index a506443e8e..39dcc9ce50 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/BalanceSegmentsTest.java
@@ -37,7 +37,6 @@ import
org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@@ -132,13 +131,8 @@ public class BalanceSegmentsTest
@Test
public void testMoveDecommissioningMaxPercentOfMaxSegmentsToMove()
{
- final ServerHolder serverHolder1 = createHolder(server1, false, segment1,
segment2);
- final ServerHolder serverHolder2 = createHolder(server2, true, segment3,
segment4);
- final ServerHolder serverHolder3 = createHolder(server3, false);
-
- BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- expectFindDestinationAndReturn(strategy, serverHolder3);
- EasyMock.replay(strategy);
+ final ServerHolder serverHolder1 = createHolder(server1, true, segment1,
segment2, segment3, segment4);
+ final ServerHolder serverHolder2 = createHolder(server2, false);
// ceil(3 * 0.6) = 2 segments from decommissioning servers
CoordinatorDynamicConfig dynamicConfig =
@@ -148,24 +142,21 @@ public class BalanceSegmentsTest
.withDecommissioningMaxPercentOfMaxSegmentsToMove(60)
.build();
DruidCoordinatorRuntimeParams params =
- defaultRuntimeParamsBuilder(serverHolder1, serverHolder2,
serverHolder3)
+ defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
.withDynamicConfigs(dynamicConfig)
- .withBalancerStrategy(strategy)
+ .withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.withSegmentAssignerUsing(loadQueueManager)
.build();
CoordinatorRunStats stats = runBalancer(params);
- EasyMock.verify(strategy);
-
- // 2 segments are moved from the decommissioning server and 1 from the
active server
- Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.MOVED,
"normal", "datasource1"));
- Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.MOVED,
"normal", "datasource2"));
- Set<DataSegment> segmentsMoved =
serverHolder3.getPeon().getSegmentsToLoad();
- Assert.assertTrue(segmentsMoved.contains(segment3));
- Assert.assertTrue(segmentsMoved.contains(segment4));
- Assert.assertTrue(segmentsMoved.contains(segment1) ||
segmentsMoved.contains(segment2));
+ // 2 segments are moved from the decommissioning server
+ long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal",
"datasource1")
+ + stats.getSegmentStat(Stats.Segments.MOVED, "normal",
"datasource2");
+ Assert.assertEquals(2L, totalMoved);
+ Set<DataSegment> segmentsMoved =
serverHolder2.getPeon().getSegmentsToLoad();
+ Assert.assertEquals(2, segmentsMoved.size());
}
@Test
@@ -220,33 +211,27 @@ public class BalanceSegmentsTest
@Test
public void
testMoveDecommissioningMaxPercentOfMaxSegmentsToMoveWithNoDecommissioning()
{
- final ServerHolder serverHolder1 = createHolder(server1, segment1,
segment2);
- final ServerHolder serverHolder2 = createHolder(server2, segment3,
segment4);
- final ServerHolder serverHolder3 = createHolder(server3);
-
- BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- expectFindDestinationAndReturn(strategy, serverHolder3);
- EasyMock.replay(strategy);
+ final ServerHolder serverHolder1 = createHolder(server1, segment1,
segment2, segment3, segment4);
+ final ServerHolder serverHolder2 = createHolder(server2);
CoordinatorDynamicConfig dynamicConfig =
CoordinatorDynamicConfig.builder()
.withSmartSegmentLoading(false)
- .withMaxSegmentsToMove(3)
+ .withMaxSegmentsToMove(4)
.withDecommissioningMaxPercentOfMaxSegmentsToMove(9)
.build();
DruidCoordinatorRuntimeParams params =
- defaultRuntimeParamsBuilder(serverHolder1, serverHolder2,
serverHolder3)
+ defaultRuntimeParamsBuilder(serverHolder1, serverHolder2)
.withDynamicConfigs(dynamicConfig)
- .withBalancerStrategy(strategy)
+ .withBalancerStrategy(balancerStrategy)
.withSegmentAssignerUsing(loadQueueManager)
.build();
CoordinatorRunStats stats = runBalancer(params);
- EasyMock.verify(strategy);
long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal",
"datasource1")
+ stats.getSegmentStat(Stats.Segments.MOVED, "normal",
"datasource2");
- Assert.assertEquals(3L, totalMoved);
- Assert.assertEquals(3, serverHolder3.getPeon().getSegmentsToLoad().size());
+ Assert.assertEquals(2L, totalMoved);
+ Assert.assertEquals(2, serverHolder2.getPeon().getSegmentsToLoad().size());
}
/**
@@ -258,18 +243,13 @@ public class BalanceSegmentsTest
final ServerHolder activeServer = createHolder(server1, false,
allSegments);
final ServerHolder decommissioningServer = createHolder(server2, true);
- BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- expectFindDestinationAndReturn(strategy, decommissioningServer);
- EasyMock.replay(strategy);
-
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(activeServer, decommissioningServer)
- .withBalancerStrategy(strategy)
+ .withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
CoordinatorRunStats stats = runBalancer(params);
- EasyMock.verify(strategy);
Assert.assertFalse(stats.hasStat(Stats.Segments.MOVED));
}
@@ -279,22 +259,17 @@ public class BalanceSegmentsTest
final ServerHolder decommissioningServer = createHolder(server1, true,
allSegments);
final ServerHolder activeServer = createHolder(server2);
- BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- expectFindDestinationAndReturn(strategy, activeServer);
- EasyMock.replay(strategy);
-
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(decommissioningServer, activeServer)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
.withSmartSegmentLoading(false)
.withMaxSegmentsToMove(3).build()
)
- .withBalancerStrategy(strategy)
+ .withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
runBalancer(params);
- EasyMock.verify(strategy);
Assert.assertEquals(0,
decommissioningServer.getPeon().getSegmentsToLoad().size());
Assert.assertEquals(3, activeServer.getPeon().getSegmentsToLoad().size());
}
@@ -358,10 +333,6 @@ public class BalanceSegmentsTest
final ServerHolder holder2 = createHolder(server2, segment3, segment4);
final ServerHolder holder3 = createHolder(server3);
- BalancerStrategy strategy = EasyMock.createMock(BalancerStrategy.class);
- expectFindDestinationAndReturn(strategy, holder3);
- EasyMock.replay(strategy);
-
DruidCoordinatorRuntimeParams params =
defaultRuntimeParamsBuilder(holder1, holder2, holder3)
.withDynamicConfigs(
@@ -372,12 +343,11 @@ public class BalanceSegmentsTest
.withPercentOfSegmentsToConsiderPerMove(40)
.build()
)
- .withBalancerStrategy(strategy)
+ .withBalancerStrategy(balancerStrategy)
.withBroadcastDatasources(broadcastDatasources)
.build();
CoordinatorRunStats stats = runBalancer(params);
- EasyMock.verify(strategy);
long totalMoved = stats.getSegmentStat(Stats.Segments.MOVED, "normal",
"datasource1")
+ stats.getSegmentStat(Stats.Segments.MOVED, "normal",
"datasource2");
Assert.assertEquals(1L, totalMoved);
@@ -463,17 +433,6 @@ public class BalanceSegmentsTest
);
}
- private void expectFindDestinationAndReturn(BalancerStrategy strategy,
ServerHolder chosenServer)
- {
- EasyMock.expect(
- strategy.findDestinationServerToMoveSegment(
- EasyMock.anyObject(),
- EasyMock.anyObject(),
- EasyMock.anyObject()
- )
- ).andReturn(chosenServer).anyTimes();
- }
-
private DataSegment createHourlySegment(String datasource, DateTime start,
String version)
{
return new DataSegment(
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 14fcd05d00..7487d332d1 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -2037,7 +2037,7 @@ public class CompactSegmentsTest
// all the datasources in the coordinator stats
final AtomicInteger numDatasources = new AtomicInteger();
stats.forEachStat(
- (dimensionValues, stat, value) -> {
+ (stat, rowKey, value) -> {
if (stat.equals(Stats.Compaction.PENDING_BYTES)
&& (expectedRemainingSegments <= 0 || value ==
expectedRemainingSegments)) {
numDatasources.incrementAndGet();
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
index cd9c1e228d..46240a340d 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/duty/RunRulesTest.java
@@ -76,6 +76,7 @@ public class RunRulesTest
{
private static final long SERVER_SIZE_10GB = 10L << 30;
private static final String DATASOURCE = "test";
+ private static final RowKey DATASOURCE_STAT_KEY =
RowKey.of(Dimension.DATASOURCE, DATASOURCE);
private LoadQueuePeon mockPeon;
private RunRules ruleRunner;
@@ -563,7 +564,7 @@ public class RunRulesTest
.build();
CoordinatorRunStats stats = runDutyAndGetStats(params);
- Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED,
DATASOURCE));
+ Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED,
DATASOURCE_STAT_KEY));
}
@Test
@@ -616,7 +617,7 @@ public class RunRulesTest
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
"normal", DATASOURCE));
- Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED,
DATASOURCE));
+ Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED,
DATASOURCE_STAT_KEY));
EasyMock.verify(mockPeon);
}
@@ -662,7 +663,7 @@ public class RunRulesTest
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
"normal", DATASOURCE));
- Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED,
DATASOURCE));
+ Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED,
DATASOURCE_STAT_KEY));
EasyMock.verify(mockPeon);
}
@@ -705,7 +706,7 @@ public class RunRulesTest
CoordinatorRunStats stats = runDutyAndGetStats(params);
Assert.assertFalse(stats.hasStat(Stats.Segments.DROPPED));
- Assert.assertEquals(12L, stats.getDataSourceStat(Stats.Segments.DELETED,
DATASOURCE));
+ Assert.assertEquals(12L, stats.get(Stats.Segments.DELETED,
DATASOURCE_STAT_KEY));
EasyMock.verify(mockPeon);
}
@@ -1188,7 +1189,7 @@ public class RunRulesTest
.build();
CoordinatorRunStats stats = runDutyAndGetStats(params);
- final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER,
DruidServer.DEFAULT_TIER).build();
+ final RowKey tierRowKey = RowKey.of(Dimension.TIER,
DruidServer.DEFAULT_TIER);
Assert.assertEquals(
dataSegment.getSize() * numReplicants,
stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey)
@@ -1250,7 +1251,7 @@ public class RunRulesTest
.build();
CoordinatorRunStats stats = runDutyAndGetStats(params);
- final RowKey tierRowKey = RowKey.builder().add(Dimension.TIER,
DruidServer.DEFAULT_TIER).build();
+ final RowKey tierRowKey = RowKey.of(Dimension.TIER,
DruidServer.DEFAULT_TIER);
Assert.assertEquals(
dataSegment.getSize() * numReplicants,
stats.get(Stats.Tier.REQUIRED_CAPACITY, tierRowKey)
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index 668013f7ff..c7e3b5b239 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.server.coordinator.rules;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@@ -77,11 +76,8 @@ public class LoadRuleTest
private ListeningExecutorService exec;
private BalancerStrategy balancerStrategy;
- private CachingCostBalancerStrategy cachingCostBalancerStrategy;
-
private SegmentLoadQueueManager loadQueueManager;
private final boolean useRoundRobinAssignment;
- private BalancerStrategy mockBalancerStrategy;
private final AtomicInteger serverId = new AtomicInteger();
@@ -101,9 +97,6 @@ public class LoadRuleTest
{
exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1,
"LoadRuleTest-%d"));
balancerStrategy = new
CostBalancerStrategyFactory().createBalancerStrategy(exec);
- cachingCostBalancerStrategy = new
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
-
- mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
loadQueueManager = new SegmentLoadQueueManager(null, null, null);
}
@@ -119,13 +112,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
-
- if (!useRoundRobinAssignment) {
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(2);
- }
- EasyMock.replay(mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon);
DruidCluster druidCluster = DruidCluster
.builder()
@@ -140,14 +127,10 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
- EasyMock.verify(mockPeon, mockBalancerStrategy);
+ EasyMock.verify(mockPeon);
}
- private CoordinatorRunStats runRuleAndGetStats(
- LoadRule rule,
- DataSegment segment,
- DruidCluster cluster
- )
+ private CoordinatorRunStats runRuleAndGetStats(LoadRule rule, DataSegment
segment, DruidCluster cluster)
{
return runRuleAndGetStats(rule, segment,
makeCoordinatorRuntimeParams(cluster, segment));
}
@@ -160,7 +143,7 @@ public class LoadRuleTest
{
final StrategicSegmentAssigner segmentAssigner =
params.getSegmentAssigner();
rule.run(segment, segmentAssigner);
- return segmentAssigner.getStats();
+ return params.getCoordinatorStats();
}
private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams(
@@ -171,7 +154,7 @@ public class LoadRuleTest
return DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster)
- .withBalancerStrategy(mockBalancerStrategy)
+ .withBalancerStrategy(balancerStrategy)
.withUsedSegmentsInTest(usedSegments)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
@@ -189,16 +172,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
-
- LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
- final DataSegment segment = createDataSegment(DS_WIKI);
-
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .anyTimes();
-
- EasyMock.replay(mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon);
ImmutableDruidServer server1 =
createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 =
createServer(Tier.T1).toImmutableDruidServer();
@@ -207,6 +181,8 @@ public class LoadRuleTest
.addTier(Tier.T1, new ServerHolder(server1, mockPeon), new
ServerHolder(server2, mockPeon))
.build();
+ final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+ final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, segment.getDataSource()));
@@ -223,7 +199,7 @@ public class LoadRuleTest
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
- EasyMock.verify(mockPeon, mockBalancerStrategy);
+ EasyMock.verify(mockPeon);
}
@Test
@@ -233,16 +209,7 @@ public class LoadRuleTest
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
-
- LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
- final DataSegment segment = createDataSegment(DS_WIKI);
-
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .anyTimes();
-
- EasyMock.replay(emptyPeon, mockBalancerStrategy);
+ EasyMock.replay(emptyPeon);
ImmutableDruidServer server1 =
createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 =
createServer(Tier.T1).toImmutableDruidServer();
@@ -251,6 +218,8 @@ public class LoadRuleTest
.addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new
ServerHolder(server2, emptyPeon))
.build();
+ final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+ final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
segment,
@@ -277,7 +246,7 @@ public class LoadRuleTest
Assert.assertEquals(1L,
statsAfterLoadPrimary.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1,
DS_WIKI));
- EasyMock.verify(emptyPeon, mockBalancerStrategy);
+ EasyMock.verify(emptyPeon);
}
@Test
@@ -286,16 +255,7 @@ public class LoadRuleTest
final LoadQueuePeon emptyPeon = createEmptyPeon();
emptyPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
-
- LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
- final DataSegment segment = createDataSegment(DS_WIKI);
-
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .anyTimes();
-
- EasyMock.replay(emptyPeon, mockBalancerStrategy);
+ EasyMock.replay(emptyPeon);
ImmutableDruidServer server1 =
createServer(Tier.T1).toImmutableDruidServer();
ImmutableDruidServer server2 =
createServer(Tier.T1).toImmutableDruidServer();
@@ -304,6 +264,8 @@ public class LoadRuleTest
.addTier(Tier.T1, new ServerHolder(server1, emptyPeon), new
ServerHolder(server2, emptyPeon))
.build();
+ final LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
+ final DataSegment segment = createDataSegment(DS_WIKI);
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
// Ensure that the segment is assigned to one of the historicals
@@ -323,14 +285,12 @@ public class LoadRuleTest
Assert.assertFalse(statsAfterLoadPrimary.hasStat(Stats.Segments.ASSIGNED));
- EasyMock.verify(emptyPeon, mockBalancerStrategy);
+ EasyMock.verify(emptyPeon);
}
@Test
public void
testLoadUsedSegmentsForAllSegmentGranularityAndCachingCostBalancerStrategy()
{
- LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
-
final List<DataSegment> segments =
CreateDataSegments.ofDatasource(DS_WIKI)
.forIntervals(1, Granularities.ALL)
@@ -338,21 +298,18 @@ public class LoadRuleTest
.eachOfSizeInMb(100);
final LoadQueuePeon loadingPeon = createLoadingPeon(segments.get(0), true);
-
loadingPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().once();
-
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(cachingCostBalancerStrategy)
- .anyTimes();
-
- EasyMock.replay(loadingPeon, mockBalancerStrategy);
+ EasyMock.replay(loadingPeon);
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, createServerHolder(Tier.T1, loadingPeon, false))
.build();
+ balancerStrategy = new
CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
+
+ LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1));
CoordinatorRunStats stats = runRuleAndGetStats(
rule,
segments.get(1),
@@ -360,7 +317,7 @@ public class LoadRuleTest
);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
- EasyMock.verify(loadingPeon, mockBalancerStrategy);
+ EasyMock.verify(loadingPeon);
}
@Test
@@ -369,10 +326,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
-
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(4);
- EasyMock.replay(mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon);
final DataSegment segment = createDataSegment(DS_WIKI);
@@ -407,14 +361,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
-
- if (!useRoundRobinAssignment) {
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(1);
- }
-
- EasyMock.replay(mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon);
DruidCluster druidCluster = DruidCluster
.builder()
@@ -428,7 +375,7 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
- EasyMock.verify(mockPeon, mockBalancerStrategy);
+ EasyMock.verify(mockPeon);
}
@Test
@@ -437,7 +384,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().atLeastOnce();
- EasyMock.replay(mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon);
final DataSegment segment = createDataSegment(DS_WIKI);
@@ -460,19 +407,12 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.DROPPED,
Tier.T1, DS_WIKI));
- EasyMock.verify(mockPeon, mockBalancerStrategy);
+ EasyMock.verify(mockPeon);
}
@Test
public void testMaxLoadingQueueSize()
{
- if (!useRoundRobinAssignment) {
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(2);
- }
- EasyMock.replay(mockBalancerStrategy);
-
final LoadQueuePeonTester peon = new LoadQueuePeonTester();
final int maxSegmentsInQueue = 2;
@@ -482,10 +422,7 @@ public class LoadRuleTest
Tier.T1,
new ServerHolder(
createServer(Tier.T1).toImmutableDruidServer(),
- peon,
- false,
- maxSegmentsInQueue,
- 10
+ peon, false, maxSegmentsInQueue, 10
)
)
.build();
@@ -497,7 +434,7 @@ public class LoadRuleTest
DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
.newBuilder(DateTimes.nowUtc())
.withDruidCluster(druidCluster)
- .withBalancerStrategy(mockBalancerStrategy)
+ .withBalancerStrategy(balancerStrategy)
.withUsedSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
.withDynamicConfigs(
CoordinatorDynamicConfig.builder()
@@ -517,8 +454,6 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats1.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, dataSegment1.getDataSource()));
Assert.assertEquals(1L, stats2.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, dataSegment2.getDataSource()));
Assert.assertEquals(0L, stats3.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, dataSegment3.getDataSource()));
-
- EasyMock.verify(mockBalancerStrategy);
}
/**
@@ -530,14 +465,7 @@ public class LoadRuleTest
{
final LoadQueuePeon mockPeon1 = createEmptyPeon();
final LoadQueuePeon mockPeon2 = createOneCallPeonMock();
-
- if (!useRoundRobinAssignment) {
-
EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(1);
- }
-
- EasyMock.replay(mockPeon1, mockPeon2, mockBalancerStrategy);
+ EasyMock.replay(mockPeon1, mockPeon2);
DruidCluster druidCluster = DruidCluster
.builder()
@@ -550,7 +478,7 @@ public class LoadRuleTest
CoordinatorRunStats stats = runRuleAndGetStats(rule, segment,
druidCluster);
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
- EasyMock.verify(mockPeon1, mockPeon2, mockBalancerStrategy);
+ EasyMock.verify(mockPeon1, mockPeon2);
}
/**
@@ -572,15 +500,6 @@ public class LoadRuleTest
ServerHolder holder4 = createServerHolder(Tier.T2, mockPeon4, false);
final DataSegment segment = createDataSegment(DS_WIKI);
- if (!useRoundRobinAssignment) {
- EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment,
ImmutableList.of(holder2)))
- .andReturn(Collections.singletonList(holder2).iterator());
- EasyMock.expect(mockBalancerStrategy.findServersToLoadSegment(segment,
ImmutableList.of(holder4, holder3)))
- .andReturn(Arrays.asList(holder3, holder4).iterator());
- }
-
- EasyMock.replay(mockBalancerStrategy);
-
DruidCluster druidCluster = DruidCluster
.builder()
.addTier(Tier.T1, holder1, holder2)
@@ -593,7 +512,7 @@ public class LoadRuleTest
Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T1, DS_WIKI));
Assert.assertEquals(2L, stats.getSegmentStat(Stats.Segments.ASSIGNED,
Tier.T2, DS_WIKI));
- EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4,
mockBalancerStrategy);
+ EasyMock.verify(mockPeon1, mockPeon2, mockPeon3, mockPeon4);
}
/**
@@ -606,10 +525,7 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon = createEmptyPeon();
mockPeon.dropSegment(EasyMock.anyObject(), EasyMock.anyObject());
EasyMock.expectLastCall().times(2);
-
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(4);
- EasyMock.replay(mockPeon, mockBalancerStrategy);
+ EasyMock.replay(mockPeon);
final DataSegment segment1 = createDataSegment("foo1");
final DataSegment segment2 = createDataSegment("foo2");
@@ -652,10 +568,6 @@ public class LoadRuleTest
final LoadQueuePeon mockPeon1 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon2 = new LoadQueuePeonTester();
final LoadQueuePeon mockPeon3 = new LoadQueuePeonTester();
-
EasyMock.expect(mockBalancerStrategy.pickServersToDropSegment(EasyMock.anyObject(),
EasyMock.anyObject()))
- .andDelegateTo(balancerStrategy)
- .times(4);
- EasyMock.replay(mockBalancerStrategy);
final DataSegment segment1 = createDataSegment(DS_WIKI);
@@ -751,11 +663,11 @@ public class LoadRuleTest
return mockPeon2;
}
- private ServerHolder createServerHolder(String tier, LoadQueuePeon
mockPeon1, boolean isDecommissioning)
+ private ServerHolder createServerHolder(String tier, LoadQueuePeon
loadQueuePeon, boolean isDecommissioning)
{
return new ServerHolder(
createServer(tier).toImmutableDruidServer(),
- mockPeon1,
+ loadQueuePeon,
isDecommissioning
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]