This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d6565f46b0 Increase the computed value of replicationThrottleLimit 
(#14913)
d6565f46b0 is described below

commit d6565f46b00108e008d492cc1df8a58559ea4cd2
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 28 18:20:22 2023 +0530

    Increase the computed value of replicationThrottleLimit (#14913)
    
    Changes
    - Increase value of `replicationThrottleLimit` computed by 
`smartSegmentLoading` from
    2% to 5% of total number of used segments.
    - Assign replicas to a tier even when some replicas are already being 
loaded in that tier
    - Limit the total number of replicas in load queue at start of run + 
replica assignments in
    the run to the `replicationThrottleLimit`.
    
    i.e. for every tier,
        num loading replicas at start of run + num replicas assigned in run <= 
replicationThrottleLimit
---
 docs/configuration/index.md                        |  2 +-
 .../coordinator/loading/ReplicationThrottler.java  | 43 ++++++-------
 .../coordinator/loading/SegmentLoadingConfig.java  |  2 +-
 .../loading/StrategicSegmentAssigner.java          | 16 ++---
 .../loading/ReplicationThrottlerTest.java          | 72 ++++++++++++++++++++++
 5 files changed, 101 insertions(+), 34 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index b16e676913..da8ce4235b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -984,7 +984,7 @@ Druid computes the values to optimize Coordinator 
performance, based on the curr
 |--------|--------------|-----------|
 |`useRoundRobinSegmentAssignment`|true|Speeds up segment assignment.|
 |`maxSegmentsInNodeLoadingQueue`|0|Removes the limit on load queue size.|
-|`replicationThrottleLimit`|2% of used segments, minimum value 100|Prevents 
aggressive replication when a historical disappears only intermittently.|
+|`replicationThrottleLimit`|5% of used segments, minimum value 100|Prevents 
aggressive replication when a historical disappears only intermittently.|
 |`replicantLifetime`|60|Allows segments to wait about an hour (assuming a 
Coordinator period of 1 minute) in the load queue before an alert is raised. In 
`smartSegmentLoading` mode, load queues are not limited by size. Segments might 
therefore assigned to a load queue even if the corresponding server is slow to 
load them.|
 |`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This 
throttling is already handled by `replicationThrottleLimit`.|
 |`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value 
1000|Ensures that some segments are always moving in the cluster to keep it 
well balanced. The maximum value keeps the Coordinator run times bounded.|
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
index 988a181707..55f5143f31 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
@@ -19,12 +19,10 @@
 
 package org.apache.druid.server.coordinator.loading;
 
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * The ReplicationThrottler is used to throttle the number of segment replicas
@@ -43,24 +41,27 @@ public class ReplicationThrottler
   private final int replicationThrottleLimit;
   private final int maxReplicaAssignmentsInRun;
 
-  private final Map<String, Integer> tierToNumAssigned = new HashMap<>();
-  private final Set<String> tiersLoadingReplicas = new HashSet<>();
+  private final Object2IntOpenHashMap<String> tierToNumAssigned = new 
Object2IntOpenHashMap<>();
+  private final Object2IntOpenHashMap<String> tierToMaxAssignments = new 
Object2IntOpenHashMap<>();
 
   private int totalReplicasAssignedInRun;
 
   /**
    * Creates a new ReplicationThrottler for use during a single coordinator 
run.
+   * The number of replicas loading on a tier must always be within the current
+   * {@code replicationThrottleLimit}. Thus, if a tier was already loading 
{@code k}
+   * replicas at the start of a coordinator run, it may be assigned only
+   * {@code replicationThrottleLimit - k} more replicas during the run.
    *
-   * @param tiersLoadingReplicas       Set of tier names which are already 
loading
-   *                                   replicas and will not be eligible for 
loading
-   *                                   more replicas in this run.
+   * @param tierToLoadingReplicaCount  Map from tier name to number of replicas
+   *                                   already being loaded.
    * @param replicationThrottleLimit   Maximum number of replicas that can be
    *                                   assigned to a single tier in the 
current run.
    * @param maxReplicaAssignmentsInRun Max number of total replicas that can be
    *                                   assigned across all tiers in the 
current run.
    */
   public ReplicationThrottler(
-      Set<String> tiersLoadingReplicas,
+      Map<String, Integer> tierToLoadingReplicaCount,
       int replicationThrottleLimit,
       int maxReplicaAssignmentsInRun
   )
@@ -68,27 +69,27 @@ public class ReplicationThrottler
     this.replicationThrottleLimit = replicationThrottleLimit;
     this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun;
     this.totalReplicasAssignedInRun = 0;
-    if (tiersLoadingReplicas != null) {
-      this.tiersLoadingReplicas.addAll(tiersLoadingReplicas);
-    }
-  }
 
-  public boolean isTierLoadingReplicas(String tier)
-  {
-    return tiersLoadingReplicas.contains(tier);
+    if (tierToLoadingReplicaCount != null) {
+      tierToLoadingReplicaCount.forEach(
+          (tier, numLoadingReplicas) -> tierToMaxAssignments.addTo(
+              tier,
+              Math.max(0, replicationThrottleLimit - numLoadingReplicas)
+          )
+      );
+    }
   }
 
-  public boolean canAssignReplica(String tier)
+  public boolean isReplicationThrottledForTier(String tier)
   {
-    return !tiersLoadingReplicas.contains(tier)
-           && totalReplicasAssignedInRun < maxReplicaAssignmentsInRun
-           && tierToNumAssigned.computeIfAbsent(tier, t -> 0) < 
replicationThrottleLimit;
+    return tierToNumAssigned.getInt(tier) >= 
tierToMaxAssignments.getOrDefault(tier, replicationThrottleLimit)
+           || totalReplicasAssignedInRun >= maxReplicaAssignmentsInRun;
   }
 
   public void incrementAssignedReplicas(String tier)
   {
     ++totalReplicasAssignedInRun;
-    tierToNumAssigned.compute(tier, (t, count) -> (count == null) ? 1 : count 
+ 1);
+    tierToNumAssigned.addTo(tier, 1);
   }
 
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index 25159cc2eb..d1f01043ba 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -48,7 +48,7 @@ public class SegmentLoadingConfig
   {
     if (dynamicConfig.isSmartSegmentLoading()) {
       // Compute replicationThrottleLimit with a lower bound of 100
-      final int throttlePercentage = 2;
+      final int throttlePercentage = 5;
       final int replicationThrottleLimit = Math.max(100, numUsedSegments * 
throttlePercentage / 100);
       log.info(
           "Smart segment loading is enabled. Calculated 
replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 8bb82c5cdf..0cc57db988 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -497,7 +497,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
     final boolean isAlreadyLoadedOnTier = numLoadedReplicas >= 1;
 
     // Do not assign replicas if tier is already busy loading some
-    if (isAlreadyLoadedOnTier && 
replicationThrottler.isTierLoadingReplicas(tier)) {
+    if (isAlreadyLoadedOnTier && 
replicationThrottler.isReplicationThrottledForTier(tier)) {
       return 0;
     }
 
@@ -543,7 +543,7 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
   private boolean replicateSegment(DataSegment segment, ServerHolder server)
   {
     final String tier = server.getServer().getTier();
-    if (!replicationThrottler.canAssignReplica(tier)) {
+    if (replicationThrottler.isReplicationThrottledForTier(tier)) {
       incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "Throttled 
replication", segment, tier);
       return false;
     }
@@ -563,22 +563,16 @@ public class StrategicSegmentAssigner implements 
SegmentActionHandler
       SegmentLoadingConfig loadingConfig
   )
   {
-    final Set<String> tiersLoadingReplicas = new HashSet<>();
+    final Map<String, Integer> tierToLoadingReplicaCount = new HashMap<>();
 
     cluster.getHistoricals().forEach(
         (tier, historicals) -> {
           int numLoadingReplicas = 
historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum();
-          if (numLoadingReplicas > 0) {
-            log.info(
-                "Tier [%s] will not be assigned replicas as it is already 
loading [%d] replicas.",
-                tier, numLoadingReplicas
-            );
-            tiersLoadingReplicas.add(tier);
-          }
+          tierToLoadingReplicaCount.put(tier, numLoadingReplicas);
         }
     );
     return new ReplicationThrottler(
-        tiersLoadingReplicas,
+        tierToLoadingReplicaCount,
         loadingConfig.getReplicationThrottleLimit(),
         loadingConfig.getMaxReplicaAssignmentsInRun()
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java
new file mode 100644
index 0000000000..4e1de51d36
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ReplicationThrottlerTest
+{
+  private static final String TIER_1 = "t1";
+  private static final String TIER_2 = "t2";
+
+  @Test
+  public void testTierDoesNotViolateThrottleLimit()
+  {
+    final int replicationThrottleLimit = 10;
+    ReplicationThrottler throttler = new ReplicationThrottler(
+        ImmutableMap.of(),
+        replicationThrottleLimit,
+        1000
+    );
+
+    // Verify that both the tiers can be assigned replicas upto the limit
+    for (int i = 0; i < replicationThrottleLimit; ++i) {
+      Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_1));
+      throttler.incrementAssignedReplicas(TIER_1);
+
+      Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2));
+      throttler.incrementAssignedReplicas(TIER_2);
+    }
+  }
+
+  @Test
+  public void testTierWithLoadingReplicasDoesNotViolateThrottleLimit()
+  {
+    final int replicationThrottleLimit = 10;
+    ReplicationThrottler throttler = new ReplicationThrottler(
+        ImmutableMap.of(TIER_1, 10, TIER_2, 7),
+        replicationThrottleLimit,
+        1000
+    );
+
+    // T1 cannot be assigned any more replicas
+    Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_1));
+
+    // T2 can be assigned replicas until it hits the limit
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2));
+      throttler.incrementAssignedReplicas(TIER_2);
+    }
+    Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_2));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to