This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e423e99  Update default maxSegmentsInNodeLoadingQueue (#11540)
e423e99 is described below

commit e423e99997e0faf54856fb6b525bc0c03471e78a
Author: Suneet Saldanha <[email protected]>
AuthorDate: Thu Aug 5 14:26:58 2021 -0400

    Update default maxSegmentsInNodeLoadingQueue (#11540)
    
    * Update default maxSegmentsInNodeLoadingQueue
    
    Update the default maxSegmentsInNodeLoadingQueue from 0 (unbounded) to 100.
    
    An unbounded maxSegmentsInNodeLoadingQueue can cause cluster instability.
    Since this is the default druid operators need to run into this instability
    and then look through the docs to see that the recommended value for a large
    cluster is 1000. This change makes it so the default will prevent clusters
    from falling over as they grow over time.
    
    * update tests
    
    * codestyle
---
 docs/configuration/index.md                        |   2 +-
 .../coordinator/CoordinatorDynamicConfig.java      |  36 ++-
 .../server/http/CoordinatorDynamicConfigTest.java  | 302 +++++++++++++++++++--
 3 files changed, 309 insertions(+), 31 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 8caaf1c..09b336d 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -840,7 +840,7 @@ Issuing a GET request at the same URL will return the spec 
that is currently in
 |`killDataSourceWhitelist`|List of specific data sources for which kill tasks 
are sent if property `druid.coordinator.kill.on` is true. This can be a list of 
comma-separated data source names or a JSON array.|none|
 |`killAllDataSources`|Send kill tasks for ALL dataSources if property 
`druid.coordinator.kill.on` is true. If this is set to true then 
`killDataSourceWhitelist` must not be specified or be empty list.|false|
 |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments 
are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is 
true. This can be a list of comma-separated data sources or a JSON array.|none|
-|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be 
queued for loading to any given server. This parameter could be used to speed 
up segments loading process, especially if there are "slow" nodes in the 
cluster (with low loading speed) or if too much segments scheduled to be 
replicated to some particular node (faster loading could be preferred to better 
segments distribution). Desired value depends on segments loading speed, 
acceptable replication time and numbe [...]
+|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be 
queued for loading to any given server. This parameter could be used to speed 
up segments loading process, especially if there are "slow" nodes in the 
cluster (with low loading speed) or if too much segments scheduled to be 
replicated to some particular node (faster loading could be preferred to better 
segments distribution). Desired value depends on segments loading speed, 
acceptable replication time and numbe [...]
 |`decommissioningNodes`| List of historical servers to 'decommission'. 
Coordinator will not assign new segments to 'decommissioning' servers,  and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by 
`decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
 |`decommissioningMaxPercentOfMaxSegmentsToMove`|  The maximum number of 
segments that may be moved away from 'decommissioning' servers to 
non-decommissioning (that is, active) servers during one Coordinator run. This 
value is relative to the total maximum segment movements allowed during one run 
which is determined by `maxSegmentsToMove`. If 
`decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be 
moved from _or to_ 'decommissioning' servers, effectively putting them [...]
 |`pauseCoordination`| Boolean flag for whether or not the coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
Interface. Such duties include: Segment balancing, Segment compaction, Emission 
of metrics controlled by the dynamic coordinator config `emitBalancingStats`, 
Submitting kill tasks  [...]
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index d4bf3e8..29d8052 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -61,7 +61,9 @@ public class CoordinatorDynamicConfig
   private final int balancerComputeThreads;
   private final boolean emitBalancingStats;
 
-  /** If true {@link KillUnusedSegments} sends kill tasks for unused segments 
in all data sources. */
+  /**
+   * If true {@link KillUnusedSegments} sends kill tasks for unused segments 
in all data sources.
+   */
   private final boolean killUnusedSegmentsInAllDataSources;
 
   /**
@@ -74,7 +76,7 @@ public class CoordinatorDynamicConfig
   /**
    * Stale pending segments belonging to the data sources in this list are not 
killed by {@link
    * KillStalePendingSegments}. In other words, segments in these data sources 
are "protected".
-   *
+   * <p>
    * Pending segments are considered "stale" when their created_time is older 
than {@link
    * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
    */
@@ -134,7 +136,7 @@ public class CoordinatorDynamicConfig
       // Keeping the legacy 'killPendingSegmentsSkipList' property name for 
backward compatibility. When the project is
       // updated to Jackson 2.9 it could be changed, see 
https://github.com/apache/druid/issues/7152
       @JsonProperty("killPendingSegmentsSkipList") Object 
dataSourcesToNotKillStalePendingSegmentsIn,
-      @JsonProperty("maxSegmentsInNodeLoadingQueue") int 
maxSegmentsInNodeLoadingQueue,
+      @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer 
maxSegmentsInNodeLoadingQueue,
       @JsonProperty("decommissioningNodes") Object decommissioningNodes,
       @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int 
decommissioningMaxPercentOfMaxSegmentsToMove,
       @JsonProperty("pauseCoordination") boolean pauseCoordination,
@@ -149,11 +151,12 @@ public class CoordinatorDynamicConfig
     this.maxSegmentsToMove = maxSegmentsToMove;
 
     if (percentOfSegmentsToConsiderPerMove == null) {
-      log.debug("percentOfSegmentsToConsiderPerMove was null! This is likely 
because your metastore does not "
-               + "reflect this configuration being added to Druid in a recent 
release. Druid is defaulting the value "
-               + "to the Druid default of %f. It is recommended that you 
re-submit your dynamic config with your "
-               + "desired value for percentOfSegmentsToConsideredPerMove",
-               Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
+      log.debug(
+          "percentOfSegmentsToConsiderPerMove was null! This is likely because 
your metastore does not "
+          + "reflect this configuration being added to Druid in a recent 
release. Druid is defaulting the value "
+          + "to the Druid default of %f. It is recommended that you re-submit 
your dynamic config with your "
+          + "desired value for percentOfSegmentsToConsideredPerMove",
+          Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
       );
       percentOfSegmentsToConsiderPerMove = 
Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE;
     }
@@ -172,7 +175,9 @@ public class CoordinatorDynamicConfig
     this.specificDataSourcesToKillUnusedSegmentsIn = 
parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
     this.dataSourcesToNotKillStalePendingSegmentsIn =
         parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
-    this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+    this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null
+                                         ? 
Builder.DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
+                                         : maxSegmentsInNodeLoadingQueue;
     this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes);
     Preconditions.checkArgument(
         decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && 
decommissioningMaxPercentOfMaxSegmentsToMove <= 100,
@@ -517,7 +522,7 @@ public class CoordinatorDynamicConfig
     private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
     private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
     private static final boolean 
DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
-    private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
+    private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
     private static final int 
DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
     private static final boolean DEFAULT_PAUSE_COORDINATION = false;
     private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
@@ -732,7 +737,8 @@ public class CoordinatorDynamicConfig
           : decommissioningMaxPercentOfMaxSegmentsToMove,
           pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : 
pauseCoordination,
           replicateAfterLoadTimeout == null ? 
DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad
+          maxNonPrimaryReplicantsToLoad == null ? 
DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
+                                                : maxNonPrimaryReplicantsToLoad
       );
     }
 
@@ -745,7 +751,9 @@ public class CoordinatorDynamicConfig
           mergeBytesLimit == null ? defaults.getMergeBytesLimit() : 
mergeBytesLimit,
           mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : 
mergeSegmentsLimit,
           maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : 
maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null ? 
defaults.getPercentOfSegmentsToConsiderPerMove() : 
percentOfSegmentsToConsiderPerMove,
+          percentOfSegmentsToConsiderPerMove == null
+          ? defaults.getPercentOfSegmentsToConsiderPerMove()
+          : percentOfSegmentsToConsiderPerMove,
           useBatchedSegmentSampler == null ? 
defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
           replicantLifetime == null ? defaults.getReplicantLifetime() : 
replicantLifetime,
           replicationThrottleLimit == null ? 
defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
@@ -769,7 +777,9 @@ public class CoordinatorDynamicConfig
           : decommissioningMaxPercentOfMaxSegmentsToMove,
           pauseCoordination == null ? defaults.getPauseCoordination() : 
pauseCoordination,
           replicateAfterLoadTimeout == null ? 
defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? 
defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad
+          maxNonPrimaryReplicantsToLoad == null
+          ? defaults.getMaxNonPrimaryReplicantsToLoad()
+          : maxNonPrimaryReplicantsToLoad
       );
     }
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index b6776a8..b96591a 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -31,9 +31,12 @@ import org.junit.Test;
 import java.util.Set;
 
 /**
+ *
  */
 public class CoordinatorDynamicConfigTest
 {
+  private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE 
= 100;
+
   private final ObjectMapper mapper = TestHelper.makeJsonMapper();
 
   @Test
@@ -69,25 +72,158 @@ public class CoordinatorDynamicConfigTest
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, 
decommissioning, 9, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        decommissioning,
+        9,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 9, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        9,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        10,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 5, true, true, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        10,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 5, true, true, 10);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        10,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        10
+    );
 
   }
 
@@ -118,13 +254,70 @@ public class CoordinatorDynamicConfigTest
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of();
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, 
decommissioning, 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        decommissioning,
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = 
CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, 
ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
   }
 
   @Test
@@ -271,7 +464,26 @@ public class CoordinatorDynamicConfigTest
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, 
decommissioning, 9, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        decommissioning,
+        9,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
   }
 
   @Test
@@ -301,7 +513,26 @@ public class CoordinatorDynamicConfigTest
         CoordinatorDynamicConfig.class
     );
 
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), 
true, 1, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        ImmutableSet.of(),
+        true,
+        1,
+        ImmutableSet.of(),
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     //ensure whitelist is empty when killAllDataSources is true
     try {
@@ -348,7 +579,26 @@ public class CoordinatorDynamicConfigTest
         CoordinatorDynamicConfig.class
     );
 
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), 
true, 0, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        ImmutableSet.of(),
+        true,
+        EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
+        ImmutableSet.of(),
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
   }
 
   @Test
@@ -369,7 +619,7 @@ public class CoordinatorDynamicConfigTest
         false,
         emptyList,
         false,
-        0,
+        EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         emptyList,
         70,
         false,
@@ -388,9 +638,27 @@ public class CoordinatorDynamicConfigTest
 
     Assert.assertEquals(
         current,
-        new CoordinatorDynamicConfig
-            .Builder(null, null, null, null, null, null, null, null, null, 
null, null, null, null, null, null, null, null, null, null)
-            .build(current)
+        new CoordinatorDynamicConfig.Builder(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ).build(current)
     );
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to