diff --git a/docs/content/configuration/index.md
b/docs/content/configuration/index.md
index 77a89abcdbb..4d52bc9392b 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -766,6 +766,7 @@ Issuing a GET request at the same URL will return the spec
that is currently in
|`killAllDataSources`|Send kill tasks for ALL dataSources if property
`druid.coordinator.kill.on` is true. If this is set to true then
`killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of dataSources for which pendingSegments
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is
true. This can be a list of comma-separated dataSources or a JSON array.|none|
|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be
queued for loading to any given server. This parameter could be used to speed
up segments loading process, especially if there are "slow" nodes in the
cluster (with low loading speed) or if too much segments scheduled to be
replicated to some particular node (faster loading could be preferred to better
segments distribution). Desired value depends on segments loading speed,
acceptable replication time and number of nodes. Value 1000 could be a start
point for a rather big cluster. Default value is 0 (loading queue is unbounded)
|0|
+|`balancerNodeLimit`|The maximum number of nodes when considering rebalancing.
Of all nodes available during rebalancing, a random subsample is considered for
moving segments off of, and another random subsample is considered for moving
segments onto|no limit, all nodes considered|
To view the audit history of coordinator dynamic config issue a GET request to
the URL -
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index c51a04a8dea..31add37fd4c 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -67,6 +67,7 @@
* See {@link LoadQueuePeon}, {@link
org.apache.druid.server.coordinator.rules.LoadRule#run}
*/
private final int maxSegmentsInNodeLoadingQueue;
+ private final int balancerNodeLimit;
@JsonCreator
public CoordinatorDynamicConfig(
@@ -85,7 +86,8 @@ public CoordinatorDynamicConfig(
@JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
@JsonProperty("killAllDataSources") boolean killAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") Object
killPendingSegmentsSkipList,
- @JsonProperty("maxSegmentsInNodeLoadingQueue") int
maxSegmentsInNodeLoadingQueue
+ @JsonProperty("maxSegmentsInNodeLoadingQueue") int
maxSegmentsInNodeLoadingQueue,
+ @JsonProperty("balancerNodeLimit") int balancerNodeLimit
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@@ -100,6 +102,7 @@ public CoordinatorDynamicConfig(
this.killDataSourceWhitelist =
parseJsonStringOrArray(killDataSourceWhitelist);
this.killPendingSegmentsSkipList =
parseJsonStringOrArray(killPendingSegmentsSkipList);
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+ this.balancerNodeLimit = balancerNodeLimit;
if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
throw new IAE("can't have killAllDataSources and non-empty
killDataSourceWhitelist");
@@ -140,6 +143,11 @@ public static CoordinatorDynamicConfig current(final
JacksonConfigManager config
return Preconditions.checkNotNull(watch(configManager).get(), "Got null
config from watcher?!");
}
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
@JsonProperty
public long getMillisToWaitBeforeDeleting()
{
@@ -212,6 +220,12 @@ public int getMaxSegmentsInNodeLoadingQueue()
return maxSegmentsInNodeLoadingQueue;
}
+ @JsonProperty
+ public int getBalancerNodeLimit()
+ {
+ return balancerNodeLimit;
+ }
+
@Override
public String toString()
{
@@ -240,43 +254,20 @@ public boolean equals(Object o)
if (o == null || getClass() != o.getClass()) {
return false;
}
-
CoordinatorDynamicConfig that = (CoordinatorDynamicConfig) o;
-
- if (millisToWaitBeforeDeleting != that.millisToWaitBeforeDeleting) {
- return false;
- }
- if (mergeBytesLimit != that.mergeBytesLimit) {
- return false;
- }
- if (mergeSegmentsLimit != that.mergeSegmentsLimit) {
- return false;
- }
- if (maxSegmentsToMove != that.maxSegmentsToMove) {
- return false;
- }
- if (replicantLifetime != that.replicantLifetime) {
- return false;
- }
- if (replicationThrottleLimit != that.replicationThrottleLimit) {
- return false;
- }
- if (balancerComputeThreads != that.balancerComputeThreads) {
- return false;
- }
- if (emitBalancingStats != that.emitBalancingStats) {
- return false;
- }
- if (killAllDataSources != that.killAllDataSources) {
- return false;
- }
- if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) {
- return false;
- }
- if (!Objects.equals(killDataSourceWhitelist,
that.killDataSourceWhitelist)) {
- return false;
- }
- return Objects.equals(killPendingSegmentsSkipList,
that.killPendingSegmentsSkipList);
+ return millisToWaitBeforeDeleting == that.millisToWaitBeforeDeleting &&
+ mergeBytesLimit == that.mergeBytesLimit &&
+ mergeSegmentsLimit == that.mergeSegmentsLimit &&
+ maxSegmentsToMove == that.maxSegmentsToMove &&
+ replicantLifetime == that.replicantLifetime &&
+ replicationThrottleLimit == that.replicationThrottleLimit &&
+ balancerComputeThreads == that.balancerComputeThreads &&
+ emitBalancingStats == that.emitBalancingStats &&
+ killAllDataSources == that.killAllDataSources &&
+ maxSegmentsInNodeLoadingQueue == that.maxSegmentsInNodeLoadingQueue
&&
+ balancerNodeLimit == that.balancerNodeLimit &&
+ killDataSourceWhitelist.equals(that.killDataSourceWhitelist) &&
+
killPendingSegmentsSkipList.equals(that.killPendingSegmentsSkipList);
}
@Override
@@ -292,17 +283,13 @@ public int hashCode()
balancerComputeThreads,
emitBalancingStats,
killAllDataSources,
- maxSegmentsInNodeLoadingQueue,
killDataSourceWhitelist,
- killPendingSegmentsSkipList
+ killPendingSegmentsSkipList,
+ maxSegmentsInNodeLoadingQueue,
+ balancerNodeLimit
);
}
- public static Builder builder()
- {
- return new Builder();
- }
-
public static class Builder
{
private static final long DEFAULT_MILLIS_TO_WAIT_BEFORE_DELETING =
TimeUnit.MINUTES.toMillis(15);
@@ -315,6 +302,7 @@ public static Builder builder()
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_KILL_ALL_DATA_SOURCES = false;
private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
+ private static final int DEFAULT_BALANCER_NODE_LIMIT = 0;
private Long millisToWaitBeforeDeleting;
private Long mergeBytesLimit;
@@ -328,6 +316,7 @@ public static Builder builder()
private Boolean killAllDataSources;
private Object killPendingSegmentsSkipList;
private Integer maxSegmentsInNodeLoadingQueue;
+ private Integer balancerNodeLimit;
public Builder()
{
@@ -346,7 +335,8 @@ public Builder(
@JsonProperty("killDataSourceWhitelist") @Nullable Object
killDataSourceWhitelist,
@JsonProperty("killAllDataSources") @Nullable Boolean
killAllDataSources,
@JsonProperty("killPendingSegmentsSkipList") @Nullable Object
killPendingSegmentsSkipList,
- @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer
maxSegmentsInNodeLoadingQueue
+ @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer
maxSegmentsInNodeLoadingQueue,
+ @JsonProperty("balancerNodeLimit") @Nullable Integer balancerNodeLimit
)
{
this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@@ -361,6 +351,7 @@ public Builder(
this.killDataSourceWhitelist = killDataSourceWhitelist;
this.killPendingSegmentsSkipList = killPendingSegmentsSkipList;
this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+ this.balancerNodeLimit = balancerNodeLimit;
}
public Builder withMillisToWaitBeforeDeleting(long
millisToWaitBeforeDeleting)
@@ -429,6 +420,12 @@ public Builder withMaxSegmentsInNodeLoadingQueue(int
maxSegmentsInNodeLoadingQue
return this;
}
+ public Builder withBalancerNodeLimit(int balancerNodeLimit)
+ {
+ this.balancerNodeLimit = balancerNodeLimit;
+ return this;
+ }
+
public CoordinatorDynamicConfig build()
{
return new CoordinatorDynamicConfig(
@@ -445,7 +442,8 @@ public CoordinatorDynamicConfig build()
killPendingSegmentsSkipList,
maxSegmentsInNodeLoadingQueue == null
? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
- : maxSegmentsInNodeLoadingQueue
+ : maxSegmentsInNodeLoadingQueue,
+ balancerNodeLimit == null ? DEFAULT_BALANCER_NODE_LIMIT :
balancerNodeLimit
);
}
@@ -465,7 +463,8 @@ public CoordinatorDynamicConfig
build(CoordinatorDynamicConfig defaults)
killPendingSegmentsSkipList == null ?
defaults.getKillPendingSegmentsSkipList() : killPendingSegmentsSkipList,
maxSegmentsInNodeLoadingQueue == null
? defaults.getMaxSegmentsInNodeLoadingQueue()
- : maxSegmentsInNodeLoadingQueue
+ : maxSegmentsInNodeLoadingQueue,
+ balancerNodeLimit == null ? defaults.getBalancerNodeLimit() :
balancerNodeLimit
);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index 129b4535085..dae6fe3e108 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -33,6 +33,7 @@
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -43,6 +44,7 @@
import java.util.stream.Collectors;
/**
+ *
*/
public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
{
@@ -106,8 +108,18 @@ private void balanceTier(
return;
}
- final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
- final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);
+ // Sort order is ascending based on available size. We reverse the "from"
to shed the largest nodes first
+ final List<ServerHolder> toMoveFrom;
+ final List<ServerHolder> toMoveTo;
+
+ final int nodeLimit =
params.getCoordinatorDynamicConfig().getBalancerNodeLimit();
+ if (nodeLimit > 0) {
+ toMoveFrom = Lists.reverse(randomSortedSubList(servers, nodeLimit));
+ toMoveTo = randomSortedSubList(servers, nodeLimit);
+ } else {
+ toMoveFrom = Lists.reverse(Lists.newArrayList(servers));
+ toMoveTo = Lists.newArrayList(servers);
+ }
if (toMoveTo.size() <= 1) {
log.info("[%s]: One or fewer servers found. Cannot balance.", tier);
@@ -169,7 +181,9 @@ private void balanceTier(
if (iter >= maxIterations) {
log.info(
"Unable to select %d remaining candidate segments out of %d total
to balance after %d iterations, ending run.",
- (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+ (maxSegmentsToMove - moved - unmoved),
+ maxSegmentsToMove,
+ iter
);
break;
}
@@ -192,6 +206,13 @@ private void balanceTier(
);
}
+ static <T> List<T> randomSortedSubList(Iterable<T> in, int limit)
+ {
+ final List<T> toShuffle = Lists.newArrayList(in);
+ Collections.shuffle(toShuffle);
+ return
toShuffle.stream().limit(limit).sorted().collect(Collectors.toList());
+ }
+
protected void moveSegment(
final BalancerSegmentHolder segment,
final ImmutableDruidServer toServer,
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 0996058b4ad..61059d3df8e 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -1423,6 +1423,7 @@ private CoordinatorDynamicConfig
createCoordinatorDynamicConfig()
.withEmitBalancingStats(false)
.withKillDataSourceWhitelist(null)
.withKillAllDataSources(false)
+ .withBalancerNodeLimit(0)
.withMaxSegmentsInNodeLoadingQueue(1000)
.build();
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 002e1977c15..3a79bb3bd6d 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -184,7 +184,7 @@ public void testUpdate()
Assert.assertEquals(
current,
new CoordinatorDynamicConfig
- .Builder(null, null, null, null, null, null, null, null, null,
null, null, null)
+ .Builder(null, null, null, null, null, null, null, null, null,
null, null, null, null)
.build(current)
);
}
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]