This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 31c2179 Coordinator fix balancer stuck (#5987)
31c2179 is described below
commit 31c2179fe1daa564e918f4ba64937743a45132a1
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jul 11 20:19:11 2018 -0700
Coordinator fix balancer stuck (#5987)
* this will fix it
* filter destinations to not consider servers already serving segment
* fix it
* cleanup
* fix opposite day in ImmutableDruidServer.equals
* simplify
---
.../java/io/druid/client/ImmutableDruidServer.java | 6 +--
.../helper/DruidCoordinatorBalancer.java | 56 ++++++++++++++++------
2 files changed, 43 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java
b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
index ef4fc66..22d5e71 100644
--- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
@@ -142,11 +142,7 @@ public class ImmutableDruidServer
ImmutableDruidServer that = (ImmutableDruidServer) o;
- if (metadata.equals(that.metadata)) {
- return false;
- }
-
- return true;
+ return metadata.equals(that.metadata);
}
@Override
diff --git
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index 23f8dcb..8547954 100644
---
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -93,9 +93,11 @@ public class DruidCoordinatorBalancer implements
DruidCoordinatorHelper
CoordinatorStats stats
)
{
- final BalancerStrategy strategy = params.getBalancerStrategy();
- final int maxSegmentsToMove =
params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
+ if (params.getAvailableSegments().size() == 0) {
+ log.info("Metadata segments are not available. Cannot balance.");
+ return;
+ }
currentlyMovingSegments.computeIfAbsent(tier, t -> new
ConcurrentHashMap<>());
if (!currentlyMovingSegments.get(tier).isEmpty()) {
@@ -117,33 +119,59 @@ public class DruidCoordinatorBalancer implements
DruidCoordinatorHelper
numSegments += sourceHolder.getServer().getSegments().size();
}
+
if (numSegments == 0) {
log.info("No segments found. Cannot balance.");
return;
}
+ final BalancerStrategy strategy = params.getBalancerStrategy();
+ final int maxSegmentsToMove =
Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(),
numSegments);
+ final int maxIterations = 2 * maxSegmentsToMove;
final int maxToLoad =
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
long unmoved = 0L;
- for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
- final BalancerSegmentHolder segmentToMove =
strategy.pickSegmentToMove(toMoveFrom);
- if (segmentToMove != null &&
params.getAvailableSegments().contains(segmentToMove.getSegment())) {
- final List<ServerHolder> toMoveToWithLoadQueueCapacity =
+ for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove;
++iter) {
+ final BalancerSegmentHolder segmentToMoveHolder =
strategy.pickSegmentToMove(toMoveFrom);
+
+ if (segmentToMoveHolder != null &&
params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
+ final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
+ final ImmutableDruidServer fromServer =
segmentToMoveHolder.getFromServer();
+ // we want to leave the server the segment is currently on in the
list...
+ // but filter out replicas that are already serving the segment, and
servers with a full load queue
+ final List<ServerHolder>
toMoveToWithLoadQueueCapacityAndNotServingSegment =
toMoveTo.stream()
- .filter(s -> maxToLoad <= 0 ||
s.getNumberOfSegmentsInQueue() < maxToLoad)
+ .filter(s -> s.getServer().equals(fromServer) ||
+ (!s.isServingSegment(segmentToMove) &&
+ (maxToLoad <= 0 ||
s.getNumberOfSegmentsInQueue() < maxToLoad)))
.collect(Collectors.toList());
- final ServerHolder destinationHolder =
- strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(),
toMoveToWithLoadQueueCapacity);
-
- if (destinationHolder != null) {
- moveSegment(segmentToMove, destinationHolder.getServer(), params);
- moved++;
+ if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
+ final ServerHolder destinationHolder =
+ strategy.findNewSegmentHomeBalancer(segmentToMove,
toMoveToWithLoadQueueCapacityAndNotServingSegment);
+
+ if (destinationHolder != null &&
!destinationHolder.getServer().equals(fromServer)) {
+ moveSegment(segmentToMoveHolder, destinationHolder.getServer(),
params);
+ moved++;
+ } else {
+ log.info("Segment [%s] is 'optimally' placed.",
segmentToMove.getIdentifier());
+ unmoved++;
+ }
} else {
- log.info("Segment [%s] is 'optimally' placed.",
segmentToMove.getSegment().getIdentifier());
+ log.info(
+ "No valid movement destinations for segment [%s].",
+ segmentToMove.getIdentifier()
+ );
unmoved++;
}
}
+ if (iter >= maxIterations) {
+ log.info(
+ "Unable to select %d remaining candidate segments out of %d total
to balance after %d iterations, ending run.",
+ (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+ );
+ break;
+ }
}
if (unmoved == maxSegmentsToMove) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]