This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 31c2179  Coordinator fix balancer stuck (#5987)
31c2179 is described below

commit 31c2179fe1daa564e918f4ba64937743a45132a1
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jul 11 20:19:11 2018 -0700

    Coordinator fix balancer stuck (#5987)
    
    * this will fix it
    
    * filter destinations to not consider servers already serving segment
    
    * fix it
    
    * cleanup
    
    * fix opposite day in ImmutableDruidServer.equals
    
    * simplify
---
 .../java/io/druid/client/ImmutableDruidServer.java |  6 +--
 .../helper/DruidCoordinatorBalancer.java           | 56 ++++++++++++++++------
 2 files changed, 43 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/io/druid/client/ImmutableDruidServer.java 
b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
index ef4fc66..22d5e71 100644
--- a/server/src/main/java/io/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/io/druid/client/ImmutableDruidServer.java
@@ -142,11 +142,7 @@ public class ImmutableDruidServer
 
     ImmutableDruidServer that = (ImmutableDruidServer) o;
 
-    if (metadata.equals(that.metadata)) {
-      return false;
-    }
-
-    return true;
+    return metadata.equals(that.metadata);
   }
 
   @Override
diff --git 
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
 
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index 23f8dcb..8547954 100644
--- 
a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ 
b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -93,9 +93,11 @@ public class DruidCoordinatorBalancer implements 
DruidCoordinatorHelper
       CoordinatorStats stats
   )
   {
-    final BalancerStrategy strategy = params.getBalancerStrategy();
-    final int maxSegmentsToMove = 
params.getCoordinatorDynamicConfig().getMaxSegmentsToMove();
 
+    if (params.getAvailableSegments().size() == 0) {
+      log.info("Metadata segments are not available. Cannot balance.");
+      return;
+    }
     currentlyMovingSegments.computeIfAbsent(tier, t -> new 
ConcurrentHashMap<>());
 
     if (!currentlyMovingSegments.get(tier).isEmpty()) {
@@ -117,33 +119,59 @@ public class DruidCoordinatorBalancer implements 
DruidCoordinatorHelper
       numSegments += sourceHolder.getServer().getSegments().size();
     }
 
+
     if (numSegments == 0) {
       log.info("No segments found.  Cannot balance.");
       return;
     }
 
+    final BalancerStrategy strategy = params.getBalancerStrategy();
+    final int maxSegmentsToMove = 
Math.min(params.getCoordinatorDynamicConfig().getMaxSegmentsToMove(), 
numSegments);
+    final int maxIterations = 2 * maxSegmentsToMove;
     final int maxToLoad = 
params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
     long unmoved = 0L;
-    for (int moved = 0; (moved + unmoved) < maxSegmentsToMove;) {
-      final BalancerSegmentHolder segmentToMove = 
strategy.pickSegmentToMove(toMoveFrom);
 
-      if (segmentToMove != null && 
params.getAvailableSegments().contains(segmentToMove.getSegment())) {
-        final List<ServerHolder> toMoveToWithLoadQueueCapacity =
+    for (int moved = 0, iter = 0; (moved + unmoved) < maxSegmentsToMove; 
++iter) {
+      final BalancerSegmentHolder segmentToMoveHolder = 
strategy.pickSegmentToMove(toMoveFrom);
+
+      if (segmentToMoveHolder != null && 
params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
+        final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
+        final ImmutableDruidServer fromServer = 
segmentToMoveHolder.getFromServer();
+        // we want to leave the server the segment is currently on in the 
list...
+        // but filter out replicas that are already serving the segment, and 
servers with a full load queue
+        final List<ServerHolder> 
toMoveToWithLoadQueueCapacityAndNotServingSegment =
             toMoveTo.stream()
-                    .filter(s -> maxToLoad <= 0 || 
s.getNumberOfSegmentsInQueue() < maxToLoad)
+                    .filter(s -> s.getServer().equals(fromServer) ||
+                                 (!s.isServingSegment(segmentToMove) &&
+                                  (maxToLoad <= 0 || 
s.getNumberOfSegmentsInQueue() < maxToLoad)))
                     .collect(Collectors.toList());
 
-        final ServerHolder destinationHolder =
-            strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), 
toMoveToWithLoadQueueCapacity);
-
-        if (destinationHolder != null) {
-          moveSegment(segmentToMove, destinationHolder.getServer(), params);
-          moved++;
+        if (toMoveToWithLoadQueueCapacityAndNotServingSegment.size() > 0) {
+          final ServerHolder destinationHolder =
+              strategy.findNewSegmentHomeBalancer(segmentToMove, 
toMoveToWithLoadQueueCapacityAndNotServingSegment);
+
+          if (destinationHolder != null && 
!destinationHolder.getServer().equals(fromServer)) {
+            moveSegment(segmentToMoveHolder, destinationHolder.getServer(), 
params);
+            moved++;
+          } else {
+            log.info("Segment [%s] is 'optimally' placed.", 
segmentToMove.getIdentifier());
+            unmoved++;
+          }
         } else {
-          log.info("Segment [%s] is 'optimally' placed.", 
segmentToMove.getSegment().getIdentifier());
+          log.info(
+              "No valid movement destinations for segment [%s].",
+              segmentToMove.getIdentifier()
+          );
           unmoved++;
         }
       }
+      if (iter >= maxIterations) {
+        log.info(
+            "Unable to select %d remaining candidate segments out of %d total 
to balance after %d iterations, ending run.",
+            (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+        );
+        break;
+      }
     }
 
     if (unmoved == maxSegmentsToMove) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to