clintropolis commented on code in PR #14584:
URL: https://github.com/apache/druid/pull/14584#discussion_r1296520325


##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+  /**
+   * At least this number of segments must be picked for moving in every cycle
+   * to keep the cluster well balanced.
+   */
+  private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+  private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+  /**
+   * Calculates the number of segments to be picked for moving in the given 
tier,
+   * based on the level of skew between the historicals in the tier.
+   *
+   * @param tier                    Name of tier used for logging purposes
+   * @param historicals             Active historicals in tier
+   * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be 
moved
+   *                                in the tier.
+   * @return Number of segments to move in the tier in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+   */
+  public static int computeNumSegmentsToMoveInTier(
+      String tier,
+      List<ServerHolder> historicals,
+      int maxSegmentsToMoveInTier
+  )
+  {
+    final int totalSegments = historicals.stream().mapToInt(
+        server -> server.getProjectedSegments().getTotalSegmentCount()
+    ).sum();
+
+    // Move at least some segments to ensure that the cluster is always 
balancing itself
+    final int minSegmentsToMove = SegmentToMoveCalculator
+        .computeMinSegmentsToMoveInTier(totalSegments);
+    final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+    log.info(
+        "Need to move [%,d] segments in tier[%s] to attain balance. Allowed 
values are [min=%d, max=%d].",
+        segmentsToMoveToFixDeviation, tier, minSegmentsToMove, 
maxSegmentsToMoveInTier
+    );
+
+    final int activeSegmentsToMove = Math.max(minSegmentsToMove, 
segmentsToMoveToFixDeviation);
+    return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+  }
+
+  /**
+   * Calculates the minimum number of segments that should be considered for
+   * moving in a tier, so that the cluster is always balancing itself.
+   * <p>
+   * This value must be calculated separately for every tier.
+   *
+   * @param totalSegmentsInTier Total number of all replicas of all segments
+   *                            loaded or queued across all historicals in the 
tier.
+   * @return {@code minSegmentsToMoveInTier} in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.6% of totalSegmentsInTier}].
+   */
+  public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+  {
+    // Divide by 2^14 and multiply by 100 so that the value increases
+    // in steps of 100 for every 2^14 = ~16k segments
+    int upperBound = (totalSegmentsInTier >> 14) * 100;
+    int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+    return Math.max(lowerBound, upperBound);
+  }

Review Comment:
   i think the only really shocking value was when I saw this test:
   ```
       Assert.assertEquals(61_000, computeMinSegmentsToMove(10_000_000));
   ```
   Assuming that it actually moved that many segments and my math is correct, 
its like a bit less than $30 a day just in S3 get object requests assuming the 
cluster is in the same region as bucket 😅 .. but I guess with a cluster that 
big this is probably a tiny fraction of total spend so maybe its not a big deal.
   
   My casual observation is that very large clusters with hundreds of data 
nodes usually make most of the moves when given the option, assuming new 
segments are continuously being created in a relatively large number, because 
there is frequently a slightly better server to place a given segment since the 
last time it was sampled. I don't have any data to prove this other than gut 
feeling though. I would also believe that maybe this is the case because i 
haven't actually seen a cluster that big trying to move segments this 
aggressively to give it a chance of ever converging on some stable state, so of 
course its always going to move stuff when only looking at tens to hundreds of 
segments at a time out of millions at a rate slower than new segments are 
created (and old ones dropped).
   
   Another idea is that we could level off the min scaling as the total number 
of segments gets very high? like make the steps smaller or something?
   
   However, I'm ok if we want to give this a shot as it is too, maybe we could 
advise in release notes that people with huge clusters might want to keep a 
careful watch on stuff to make sure things are actually chill.



##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/SegmentToMoveCalculator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordinator.SegmentCountsPerInterval;
+import org.apache.druid.server.coordinator.ServerHolder;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Calculates the maximum, minimum and required number of segments to move in a
+ * Coordinator run for balancing.
+ */
+public class SegmentToMoveCalculator
+{
+  /**
+   * At least this number of segments must be picked for moving in every cycle
+   * to keep the cluster well balanced.
+   */
+  private static final int MIN_SEGMENTS_TO_MOVE = 100;
+
+  private static final Logger log = new Logger(SegmentToMoveCalculator.class);
+
+  /**
+   * Calculates the number of segments to be picked for moving in the given 
tier,
+   * based on the level of skew between the historicals in the tier.
+   *
+   * @param tier                    Name of tier used for logging purposes
+   * @param historicals             Active historicals in tier
+   * @param maxSegmentsToMoveInTier Maximum number of segments allowed to be 
moved
+   *                                in the tier.
+   * @return Number of segments to move in the tier in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code maxSegmentsToMoveInTier}].
+   */
+  public static int computeNumSegmentsToMoveInTier(
+      String tier,
+      List<ServerHolder> historicals,
+      int maxSegmentsToMoveInTier
+  )
+  {
+    final int totalSegments = historicals.stream().mapToInt(
+        server -> server.getProjectedSegments().getTotalSegmentCount()
+    ).sum();
+
+    // Move at least some segments to ensure that the cluster is always 
balancing itself
+    final int minSegmentsToMove = SegmentToMoveCalculator
+        .computeMinSegmentsToMoveInTier(totalSegments);
+    final int segmentsToMoveToFixDeviation = SegmentToMoveCalculator
+        .computeNumSegmentsToMoveToBalanceTier(tier, historicals);
+    log.info(
+        "Need to move [%,d] segments in tier[%s] to attain balance. Allowed 
values are [min=%d, max=%d].",
+        segmentsToMoveToFixDeviation, tier, minSegmentsToMove, 
maxSegmentsToMoveInTier
+    );
+
+    final int activeSegmentsToMove = Math.max(minSegmentsToMove, 
segmentsToMoveToFixDeviation);
+    return Math.min(activeSegmentsToMove, maxSegmentsToMoveInTier);
+  }
+
+  /**
+   * Calculates the minimum number of segments that should be considered for
+   * moving in a tier, so that the cluster is always balancing itself.
+   * <p>
+   * This value must be calculated separately for every tier.
+   *
+   * @param totalSegmentsInTier Total number of all replicas of all segments
+   *                            loaded or queued across all historicals in the 
tier.
+   * @return {@code minSegmentsToMoveInTier} in the range
+   * [{@link #MIN_SEGMENTS_TO_MOVE}, {@code ~0.6% of totalSegmentsInTier}].
+   */
+  public static int computeMinSegmentsToMoveInTier(int totalSegmentsInTier)
+  {
+    // Divide by 2^14 and multiply by 100 so that the value increases
+    // in steps of 100 for every 2^14 = ~16k segments
+    int upperBound = (totalSegmentsInTier >> 14) * 100;
+    int lowerBound = Math.min(MIN_SEGMENTS_TO_MOVE, totalSegmentsInTier);
+    return Math.max(lowerBound, upperBound);
+  }

Review Comment:
   i think the only really shocking value was when I saw this test:
   ```
       Assert.assertEquals(61_000, computeMinSegmentsToMove(10_000_000));
   ```
   Assuming that it actually moved that many segments and my math is correct, 
its like a bit less than $30 a day just in S3 get object requests assuming the 
cluster is in the same region as bucket 😅 .. but I guess with a cluster that 
big this is probably a tiny fraction of total spend so maybe its not a big deal.
   
   My casual observation is that very large clusters with hundreds of data 
nodes usually make most of the moves when given the option, assuming new 
segments are continuously being created in a relatively large number, because 
there is frequently a slightly better server to place a given segment since the 
last time it was sampled. I don't have any data to prove this other than gut 
feeling though. I would also believe that maybe this is the case because i 
haven't actually seen a cluster that big trying to move segments this 
aggressively to give it a chance of ever converging on some stable state, so of 
course its always going to move stuff when only looking at tens to hundreds of 
segments at a time out of millions at a rate slower than new segments are 
created (and old ones dropped).
   
   Another idea is that we could level off the min scaling as the total number 
of segments gets very high? like make the steps smaller or something?
   
   However, I'm ok if we want to give this a shot as it is too, maybe we could 
advise in release notes that people with huge clusters might want to keep a 
careful watch on stuff to make sure things are actually chill.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to