This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e423e99 Update default maxSegmentsInNodeLoadingQueue (#11540)
e423e99 is described below
commit e423e99997e0faf54856fb6b525bc0c03471e78a
Author: Suneet Saldanha <[email protected]>
AuthorDate: Thu Aug 5 14:26:58 2021 -0400
Update default maxSegmentsInNodeLoadingQueue (#11540)
* Update default maxSegmentsInNodeLoadingQueue
Update the default maxSegmentsInNodeLoadingQueue from 0 (unbounded) to 100.
An unbounded maxSegmentsInNodeLoadingQueue can cause cluster instability.
Since this is the default druid operators need to run into this instability
and then look through the docs to see that the recommended value for a large
cluster is 1000. This change makes it so the default will prevent clusters
from falling over as they grow over time.
* update tests
* codestyle
---
docs/configuration/index.md | 2 +-
.../coordinator/CoordinatorDynamicConfig.java | 36 ++-
.../server/http/CoordinatorDynamicConfigTest.java | 302 +++++++++++++++++++--
3 files changed, 309 insertions(+), 31 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 8caaf1c..09b336d 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -840,7 +840,7 @@ Issuing a GET request at the same URL will return the spec
that is currently in
|`killDataSourceWhitelist`|List of specific data sources for which kill tasks
are sent if property `druid.coordinator.kill.on` is true. This can be a list of
comma-separated data source names or a JSON array.|none|
|`killAllDataSources`|Send kill tasks for ALL dataSources if property
`druid.coordinator.kill.on` is true. If this is set to true then
`killDataSourceWhitelist` must not be specified or be empty list.|false|
|`killPendingSegmentsSkipList`|List of data sources for which pendingSegments
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is
true. This can be a list of comma-separated data sources or a JSON array.|none|
-|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be
queued for loading to any given server. This parameter could be used to speed
up segments loading process, especially if there are "slow" nodes in the
cluster (with low loading speed) or if too much segments scheduled to be
replicated to some particular node (faster loading could be preferred to better
segments distribution). Desired value depends on segments loading speed,
acceptable replication time and numbe [...]
+|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be
queued for loading to any given server. This parameter could be used to speed
up segments loading process, especially if there are "slow" nodes in the
cluster (with low loading speed) or if too much segments scheduled to be
replicated to some particular node (faster loading could be preferred to better
segments distribution). Desired value depends on segments loading speed,
acceptable replication time and numbe [...]
|`decommissioningNodes`| List of historical servers to 'decommission'.
Coordinator will not assign new segments to 'decommissioning' servers, and
segments will be moved away from them to be placed on non-decommissioning
servers at the maximum rate specified by
`decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
|`decommissioningMaxPercentOfMaxSegmentsToMove`| The maximum number of
segments that may be moved away from 'decommissioning' servers to
non-decommissioning (that is, active) servers during one Coordinator run. This
value is relative to the total maximum segment movements allowed during one run
which is determined by `maxSegmentsToMove`. If
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be
moved from _or to_ 'decommissioning' servers, effectively putting them [...]
|`pauseCoordination`| Boolean flag for whether or not the coordinator should
execute its various duties of coordinating the cluster. Setting this to true
essentially pauses all coordination work while allowing the API to remain up.
Duties that are paused include all classes that implement the `CoordinatorDuty`
Interface. Such duties include: Segment balancing, Segment compaction, Emission
of metrics controlled by the dynamic coordinator config `emitBalancingStats`,
Submitting kill tasks [...]
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index d4bf3e8..29d8052 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -61,7 +61,9 @@ public class CoordinatorDynamicConfig
private final int balancerComputeThreads;
private final boolean emitBalancingStats;
- /** If true {@link KillUnusedSegments} sends kill tasks for unused segments
in all data sources. */
+ /**
+ * If true {@link KillUnusedSegments} sends kill tasks for unused segments
in all data sources.
+ */
private final boolean killUnusedSegmentsInAllDataSources;
/**
@@ -74,7 +76,7 @@ public class CoordinatorDynamicConfig
/**
* Stale pending segments belonging to the data sources in this list are not
killed by {@link
* KillStalePendingSegments}. In other words, segments in these data sources
are "protected".
- *
+ * <p>
* Pending segments are considered "stale" when their created_time is older
than {@link
* KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
*/
@@ -134,7 +136,7 @@ public class CoordinatorDynamicConfig
// Keeping the legacy 'killPendingSegmentsSkipList' property name for
backward compatibility. When the project is
// updated to Jackson 2.9 it could be changed, see
https://github.com/apache/druid/issues/7152
@JsonProperty("killPendingSegmentsSkipList") Object
dataSourcesToNotKillStalePendingSegmentsIn,
- @JsonProperty("maxSegmentsInNodeLoadingQueue") int
maxSegmentsInNodeLoadingQueue,
+ @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer
maxSegmentsInNodeLoadingQueue,
@JsonProperty("decommissioningNodes") Object decommissioningNodes,
@JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int
decommissioningMaxPercentOfMaxSegmentsToMove,
@JsonProperty("pauseCoordination") boolean pauseCoordination,
@@ -149,11 +151,12 @@ public class CoordinatorDynamicConfig
this.maxSegmentsToMove = maxSegmentsToMove;
if (percentOfSegmentsToConsiderPerMove == null) {
- log.debug("percentOfSegmentsToConsiderPerMove was null! This is likely
because your metastore does not "
- + "reflect this configuration being added to Druid in a recent
release. Druid is defaulting the value "
- + "to the Druid default of %f. It is recommended that you
re-submit your dynamic config with your "
- + "desired value for percentOfSegmentsToConsideredPerMove",
- Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
+ log.debug(
+ "percentOfSegmentsToConsiderPerMove was null! This is likely because
your metastore does not "
+ + "reflect this configuration being added to Druid in a recent
release. Druid is defaulting the value "
+ + "to the Druid default of %f. It is recommended that you re-submit
your dynamic config with your "
+ + "desired value for percentOfSegmentsToConsideredPerMove",
+ Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
);
percentOfSegmentsToConsiderPerMove =
Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE;
}
@@ -172,7 +175,9 @@ public class CoordinatorDynamicConfig
this.specificDataSourcesToKillUnusedSegmentsIn =
parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
this.dataSourcesToNotKillStalePendingSegmentsIn =
parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
- this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+ this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null
+ ?
Builder.DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
+ : maxSegmentsInNodeLoadingQueue;
this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes);
Preconditions.checkArgument(
decommissioningMaxPercentOfMaxSegmentsToMove >= 0 &&
decommissioningMaxPercentOfMaxSegmentsToMove <= 100,
@@ -517,7 +522,7 @@ public class CoordinatorDynamicConfig
private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
private static final boolean
DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
- private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
+ private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
private static final int
DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
private static final boolean DEFAULT_PAUSE_COORDINATION = false;
private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
@@ -732,7 +737,8 @@ public class CoordinatorDynamicConfig
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION :
pauseCoordination,
replicateAfterLoadTimeout == null ?
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad
+ maxNonPrimaryReplicantsToLoad == null ?
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
+ : maxNonPrimaryReplicantsToLoad
);
}
@@ -745,7 +751,9 @@ public class CoordinatorDynamicConfig
mergeBytesLimit == null ? defaults.getMergeBytesLimit() :
mergeBytesLimit,
mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() :
mergeSegmentsLimit,
maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() :
maxSegmentsToMove,
- percentOfSegmentsToConsiderPerMove == null ?
defaults.getPercentOfSegmentsToConsiderPerMove() :
percentOfSegmentsToConsiderPerMove,
+ percentOfSegmentsToConsiderPerMove == null
+ ? defaults.getPercentOfSegmentsToConsiderPerMove()
+ : percentOfSegmentsToConsiderPerMove,
useBatchedSegmentSampler == null ?
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
replicantLifetime == null ? defaults.getReplicantLifetime() :
replicantLifetime,
replicationThrottleLimit == null ?
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
@@ -769,7 +777,9 @@ public class CoordinatorDynamicConfig
: decommissioningMaxPercentOfMaxSegmentsToMove,
pauseCoordination == null ? defaults.getPauseCoordination() :
pauseCoordination,
replicateAfterLoadTimeout == null ?
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
- maxNonPrimaryReplicantsToLoad == null ?
defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad
+ maxNonPrimaryReplicantsToLoad == null
+ ? defaults.getMaxNonPrimaryReplicantsToLoad()
+ : maxNonPrimaryReplicantsToLoad
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index b6776a8..b96591a 100644
---
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -31,9 +31,12 @@ import org.junit.Test;
import java.util.Set;
/**
+ *
*/
public class CoordinatorDynamicConfigTest
{
+ private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
= 100;
+
private final ObjectMapper mapper = TestHelper.makeJsonMapper();
@Test
@@ -69,25 +72,158 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
- assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1,
decommissioning, 9, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ decommissioning,
+ 9,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 9, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 9,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 5,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 5,
+ true,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 10,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 5,
+ true,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 5, true, true, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 10,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 5,
+ true,
+ true,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 5, true, true, 10);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 10,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 5,
+ true,
+ true,
+ 10
+ );
}
@@ -118,13 +254,70 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of();
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
- assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1,
decommissioning, 0, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 100,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ decommissioning,
+ 0,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 0, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 100,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 0,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
actual =
CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
- assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1,
ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 100,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ ImmutableSet.of("host1"),
+ 5,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
}
@Test
@@ -271,7 +464,26 @@ public class CoordinatorDynamicConfigTest
);
ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
- assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1,
decommissioning, 9, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 100,
+ 1,
+ 1,
+ 2,
+ true,
+ whitelist,
+ false,
+ 1,
+ decommissioning,
+ 9,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
}
@Test
@@ -301,7 +513,26 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
- assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(),
true, 1, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 2,
+ true,
+ ImmutableSet.of(),
+ true,
+ 1,
+ ImmutableSet.of(),
+ 0,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
//ensure whitelist is empty when killAllDataSources is true
try {
@@ -348,7 +579,26 @@ public class CoordinatorDynamicConfigTest
CoordinatorDynamicConfig.class
);
- assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(),
true, 0, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
+ assertConfig(
+ actual,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 1,
+ 2,
+ true,
+ ImmutableSet.of(),
+ true,
+ EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
+ ImmutableSet.of(),
+ 0,
+ false,
+ false,
+ Integer.MAX_VALUE
+ );
}
@Test
@@ -369,7 +619,7 @@ public class CoordinatorDynamicConfigTest
false,
emptyList,
false,
- 0,
+ EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
emptyList,
70,
false,
@@ -388,9 +638,27 @@ public class CoordinatorDynamicConfigTest
Assert.assertEquals(
current,
- new CoordinatorDynamicConfig
- .Builder(null, null, null, null, null, null, null, null, null,
null, null, null, null, null, null, null, null, null, null)
- .build(current)
+ new CoordinatorDynamicConfig.Builder(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ).build(current)
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]