KarmaGYZ commented on code in PR #23658:
URL: https://github.com/apache/flink/pull/23658#discussion_r1382732177


##########
flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.configuration.ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY;
+import static 
org.apache.flink.configuration.TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE;
+import static 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TaskManagerLoadBalanceMode}. */
+class TaskManagerLoadBalanceModeTest {
+
+    @Test
+    void testReadTaskManagerLoadBalanceMode() {
+        // Check for non-set 'taskmanager.load-balance.mode' and
+        // 'cluster.evenly-spread-out-slots: false'
+        Configuration conf1 = new Configuration();
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf1))
+                .isEqualTo(TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue());
+
+        // Check for non-set 'taskmanager.load-balance.mode' and
+        // 'cluster.evenly-spread-out-slots: true'
+        Configuration conf2 = new Configuration();
+        conf2.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf2))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+        // Check for setting manually 'taskmanager.load-balance.mode: NONE' and
+        // 'cluster.evenly-spread-out-slots: false'
+        Configuration conf3 = new Configuration();
+        conf3.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.NONE);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf3))
+                .isEqualTo(TaskManagerLoadBalanceMode.NONE);
+
+        // Check for setting manually 'taskmanager.load-balance.mode: NONE' and
+        // 'cluster.evenly-spread-out-slots: true'
+        Configuration conf4 = new Configuration();
+        conf4.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.NONE);
+        conf4.set(EVENLY_SPREAD_OUT_SLOTS_STRATEGY, true);
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf4))
+                .isEqualTo(TaskManagerLoadBalanceMode.NONE);

Review Comment:
   Following the FLIP-321, I think we need to guard the backward compatibility 
here. If a user configures the `EVENLY_SPREAD_OUT_SLOTS_STRATEGY`, the behavior 
should be equal to configuring `TaskManagerLoadBalanceMode.SLOTS`.



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##########
@@ -708,6 +711,49 @@ public class TaskManagerOptions {
                             "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
                                     + " when the stream task is cancelled.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
+    public static final ConfigOption<TaskManagerLoadBalanceMode> 
TASK_MANAGER_LOAD_BALANCE_MODE =
+            ConfigOptions.key("taskmanager.load-balance.mode")
+                    .enumType(TaskManagerLoadBalanceMode.class)
+                    .defaultValue(TaskManagerLoadBalanceMode.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Mode for the load-balance 
allocation strategy across all available %s. "
+                                                    + "The %s mode tries to 
spread out the slots evenly across all available %s. "
+                                                    + "The %s mode is the 
default mode without any specified strategy. ",

Review Comment:
   You can use the `list()` to list all the valid options.



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##########
@@ -708,6 +711,49 @@ public class TaskManagerOptions {
                             "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
                                     + " when the stream task is cancelled.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)

Review Comment:
   Should it belong to `EXPERT_SCHEDULING`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to