This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d6565f46b0 Increase the computed value of replicationThrottleLimit
(#14913)
d6565f46b0 is described below
commit d6565f46b00108e008d492cc1df8a58559ea4cd2
Author: Kashif Faraz <[email protected]>
AuthorDate: Mon Aug 28 18:20:22 2023 +0530
Increase the computed value of replicationThrottleLimit (#14913)
Changes
- Increase value of `replicationThrottleLimit` computed by
`smartSegmentLoading` from
2% to 5% of total number of used segments.
- Assign replicas to a tier even when some replicas are already being
loaded in that tier
- Limit the total number of replicas in load queue at start of run +
replica assignments in
the run to the `replicationThrottleLimit`.
i.e. for every tier,
num loading replicas at start of run + num replicas assigned in run <=
replicationThrottleLimit
---
docs/configuration/index.md | 2 +-
.../coordinator/loading/ReplicationThrottler.java | 43 ++++++-------
.../coordinator/loading/SegmentLoadingConfig.java | 2 +-
.../loading/StrategicSegmentAssigner.java | 16 ++---
.../loading/ReplicationThrottlerTest.java | 72 ++++++++++++++++++++++
5 files changed, 101 insertions(+), 34 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index b16e676913..da8ce4235b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -984,7 +984,7 @@ Druid computes the values to optimize Coordinator
performance, based on the curr
|--------|--------------|-----------|
|`useRoundRobinSegmentAssignment`|true|Speeds up segment assignment.|
|`maxSegmentsInNodeLoadingQueue`|0|Removes the limit on load queue size.|
-|`replicationThrottleLimit`|2% of used segments, minimum value 100|Prevents
aggressive replication when a historical disappears only intermittently.|
+|`replicationThrottleLimit`|5% of used segments, minimum value 100|Prevents
aggressive replication when a historical disappears only intermittently.|
|`replicantLifetime`|60|Allows segments to wait about an hour (assuming a
Coordinator period of 1 minute) in the load queue before an alert is raised. In
`smartSegmentLoading` mode, load queues are not limited by size. Segments might
therefore assigned to a load queue even if the corresponding server is slow to
load them.|
|`maxNonPrimaryReplicantsToLoad`|`Integer.MAX_VALUE` (no limit)|This
throttling is already handled by `replicationThrottleLimit`.|
|`maxSegmentsToMove`|2% of used segments, minimum value 100, maximum value
1000|Ensures that some segments are always moving in the cluster to keep it
well balanced. The maximum value keeps the Coordinator run times bounded.|
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
index 988a181707..55f5143f31 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/ReplicationThrottler.java
@@ -19,12 +19,10 @@
package org.apache.druid.server.coordinator.loading;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
/**
* The ReplicationThrottler is used to throttle the number of segment replicas
@@ -43,24 +41,27 @@ public class ReplicationThrottler
private final int replicationThrottleLimit;
private final int maxReplicaAssignmentsInRun;
- private final Map<String, Integer> tierToNumAssigned = new HashMap<>();
- private final Set<String> tiersLoadingReplicas = new HashSet<>();
+ private final Object2IntOpenHashMap<String> tierToNumAssigned = new
Object2IntOpenHashMap<>();
+ private final Object2IntOpenHashMap<String> tierToMaxAssignments = new
Object2IntOpenHashMap<>();
private int totalReplicasAssignedInRun;
/**
* Creates a new ReplicationThrottler for use during a single coordinator
run.
+ * The number of replicas loading on a tier must always be within the current
+ * {@code replicationThrottleLimit}. Thus, if a tier was already loading
{@code k}
+ * replicas at the start of a coordinator run, it may be assigned only
+ * {@code replicationThrottleLimit - k} more replicas during the run.
*
- * @param tiersLoadingReplicas Set of tier names which are already
loading
- * replicas and will not be eligible for
loading
- * more replicas in this run.
+ * @param tierToLoadingReplicaCount Map from tier name to number of replicas
+ * already being loaded.
* @param replicationThrottleLimit Maximum number of replicas that can be
* assigned to a single tier in the
current run.
* @param maxReplicaAssignmentsInRun Max number of total replicas that can be
* assigned across all tiers in the
current run.
*/
public ReplicationThrottler(
- Set<String> tiersLoadingReplicas,
+ Map<String, Integer> tierToLoadingReplicaCount,
int replicationThrottleLimit,
int maxReplicaAssignmentsInRun
)
@@ -68,27 +69,27 @@ public class ReplicationThrottler
this.replicationThrottleLimit = replicationThrottleLimit;
this.maxReplicaAssignmentsInRun = maxReplicaAssignmentsInRun;
this.totalReplicasAssignedInRun = 0;
- if (tiersLoadingReplicas != null) {
- this.tiersLoadingReplicas.addAll(tiersLoadingReplicas);
- }
- }
- public boolean isTierLoadingReplicas(String tier)
- {
- return tiersLoadingReplicas.contains(tier);
+ if (tierToLoadingReplicaCount != null) {
+ tierToLoadingReplicaCount.forEach(
+ (tier, numLoadingReplicas) -> tierToMaxAssignments.addTo(
+ tier,
+ Math.max(0, replicationThrottleLimit - numLoadingReplicas)
+ )
+ );
+ }
}
- public boolean canAssignReplica(String tier)
+ public boolean isReplicationThrottledForTier(String tier)
{
- return !tiersLoadingReplicas.contains(tier)
- && totalReplicasAssignedInRun < maxReplicaAssignmentsInRun
- && tierToNumAssigned.computeIfAbsent(tier, t -> 0) <
replicationThrottleLimit;
+ return tierToNumAssigned.getInt(tier) >=
tierToMaxAssignments.getOrDefault(tier, replicationThrottleLimit)
+ || totalReplicasAssignedInRun >= maxReplicaAssignmentsInRun;
}
public void incrementAssignedReplicas(String tier)
{
++totalReplicasAssignedInRun;
- tierToNumAssigned.compute(tier, (t, count) -> (count == null) ? 1 : count
+ 1);
+ tierToNumAssigned.addTo(tier, 1);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
index 25159cc2eb..d1f01043ba 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/SegmentLoadingConfig.java
@@ -48,7 +48,7 @@ public class SegmentLoadingConfig
{
if (dynamicConfig.isSmartSegmentLoading()) {
// Compute replicationThrottleLimit with a lower bound of 100
- final int throttlePercentage = 2;
+ final int throttlePercentage = 5;
final int replicationThrottleLimit = Math.max(100, numUsedSegments *
throttlePercentage / 100);
log.info(
"Smart segment loading is enabled. Calculated
replicationThrottleLimit[%,d] (%d%% of used segments[%,d]).",
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
index 8bb82c5cdf..0cc57db988 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/loading/StrategicSegmentAssigner.java
@@ -497,7 +497,7 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
final boolean isAlreadyLoadedOnTier = numLoadedReplicas >= 1;
// Do not assign replicas if tier is already busy loading some
- if (isAlreadyLoadedOnTier &&
replicationThrottler.isTierLoadingReplicas(tier)) {
+ if (isAlreadyLoadedOnTier &&
replicationThrottler.isReplicationThrottledForTier(tier)) {
return 0;
}
@@ -543,7 +543,7 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
private boolean replicateSegment(DataSegment segment, ServerHolder server)
{
final String tier = server.getServer().getTier();
- if (!replicationThrottler.canAssignReplica(tier)) {
+ if (replicationThrottler.isReplicationThrottledForTier(tier)) {
incrementSkipStat(Stats.Segments.ASSIGN_SKIPPED, "Throttled
replication", segment, tier);
return false;
}
@@ -563,22 +563,16 @@ public class StrategicSegmentAssigner implements
SegmentActionHandler
SegmentLoadingConfig loadingConfig
)
{
- final Set<String> tiersLoadingReplicas = new HashSet<>();
+ final Map<String, Integer> tierToLoadingReplicaCount = new HashMap<>();
cluster.getHistoricals().forEach(
(tier, historicals) -> {
int numLoadingReplicas =
historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum();
- if (numLoadingReplicas > 0) {
- log.info(
- "Tier [%s] will not be assigned replicas as it is already
loading [%d] replicas.",
- tier, numLoadingReplicas
- );
- tiersLoadingReplicas.add(tier);
- }
+ tierToLoadingReplicaCount.put(tier, numLoadingReplicas);
}
);
return new ReplicationThrottler(
- tiersLoadingReplicas,
+ tierToLoadingReplicaCount,
loadingConfig.getReplicationThrottleLimit(),
loadingConfig.getMaxReplicaAssignmentsInRun()
);
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java
new file mode 100644
index 0000000000..4e1de51d36
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/coordinator/loading/ReplicationThrottlerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ReplicationThrottlerTest
+{
+ private static final String TIER_1 = "t1";
+ private static final String TIER_2 = "t2";
+
+ @Test
+ public void testTierDoesNotViolateThrottleLimit()
+ {
+ final int replicationThrottleLimit = 10;
+ ReplicationThrottler throttler = new ReplicationThrottler(
+ ImmutableMap.of(),
+ replicationThrottleLimit,
+ 1000
+ );
+
+ // Verify that both the tiers can be assigned replicas upto the limit
+ for (int i = 0; i < replicationThrottleLimit; ++i) {
+ Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_1));
+ throttler.incrementAssignedReplicas(TIER_1);
+
+ Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2));
+ throttler.incrementAssignedReplicas(TIER_2);
+ }
+ }
+
+ @Test
+ public void testTierWithLoadingReplicasDoesNotViolateThrottleLimit()
+ {
+ final int replicationThrottleLimit = 10;
+ ReplicationThrottler throttler = new ReplicationThrottler(
+ ImmutableMap.of(TIER_1, 10, TIER_2, 7),
+ replicationThrottleLimit,
+ 1000
+ );
+
+ // T1 cannot be assigned any more replicas
+ Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_1));
+
+ // T2 can be assigned replicas until it hits the limit
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertFalse(throttler.isReplicationThrottledForTier(TIER_2));
+ throttler.incrementAssignedReplicas(TIER_2);
+ }
+ Assert.assertTrue(throttler.isReplicationThrottledForTier(TIER_2));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]