1996fanrui commented on code in PR #23658:
URL: https://github.com/apache/flink/pull/23658#discussion_r1382318102


##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerLoadBalanceMode.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
+@Internal
+public enum TaskManagerLoadBalanceMode {

Review Comment:
   I don't think it should be a separate class, especially it shouldn't be 
marked as `@Internal` class, because users will use it during `conf.set`, such 
as : `conf.set(TASK_MANAGER_LOAD_BALANCE_MODE, 
TaskManagerLoadBalanceMode.SLOTS)`.
   
   It's better to add the class as a internal class of `TaskManagerOptions`. 
Please check out `JobManagerOptions.SchedulerType`.



##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -84,17 +88,47 @@ public class ClusterOptions {
                                     + "By default it will use 4 * the number 
of CPU cores (hardware contexts) that the cluster process has access to. "
                                     + "Increasing the pool size allows to run 
more IO operations concurrently.");
 
+    /**
+     * @deprecated Please use {@link 
TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE} instead.
+     *     Note: The 'taskmanager.load-balance.mode: SLOTS' is equal to
+     *     'cluster.evenly-spread-out-slots: true'. The 
'taskmanager.load-balance.mode: NONE' is
+     *     equal to 'cluster.evenly-spread-out-slots: false'.
+     */
+    @Deprecated
     @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
     public static final ConfigOption<Boolean> EVENLY_SPREAD_OUT_SLOTS_STRATEGY 
=
-            ConfigOptions.key("cluster.evenly-spread-out-slots")
+            ConfigOptions.key(EVENLY_SPREAD_OUT_SLOTS_STRATEGY_KEY)
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Enable the slot spread out 
allocation strategy. This strategy tries to spread out "
-                                                    + "the slots evenly across 
all available %s.",
-                                            code("TaskExecutors"))
+                                            "Enable the slot spread out 
allocation strategy. "
+                                                    + "This strategy tries to 
spread out "
+                                                    + "the slots evenly across 
all available %s. "
+                                                    + "Note: The configuration 
item is deprecated. "
+                                                    + "Please use '%s' 
instead. "
+                                                    + "The '%s: %s' is equal 
to '%s: %s'. "
+                                                    + "The '%s: %s' is equal 
to '%s: %s'.",
+                                            code("TaskExecutors"),
+                                            code(
+                                                    TaskManagerOptions
+                                                            
.TASK_MANAGER_LOAD_BALANCE_MODE
+                                                            .key()),
+                                            code(
+                                                    TaskManagerOptions
+                                                            
.TASK_MANAGER_LOAD_BALANCE_MODE
+                                                            .key()),
+                                            code("SLOTS"),
+                                            
code(EVENLY_SPREAD_OUT_SLOTS_STRATEGY_KEY),
+                                            code("true"),
+                                            code(
+                                                    TaskManagerOptions
+                                                            
.TASK_MANAGER_LOAD_BALANCE_MODE
+                                                            .key()),
+                                            code("NONE"),
+                                            
code(EVENLY_SPREAD_OUT_SLOTS_STRATEGY_KEY),
+                                            code("false"))

Review Comment:
   This change may be not needed.
   
   This desctription is used in doc, however depreated options will be removed 
in doc. You can check out your html change.
   
   If this suggestion makes sense, please clean up the code, for example: check 
whether `EVENLY_SPREAD_OUT_SLOTS_STRATEGY_KEY` can be reverted.



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##########
@@ -708,6 +708,23 @@ public class TaskManagerOptions {
                             "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
                                     + " when the stream task is cancelled.");
 
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)
+    public static final ConfigOption<TaskManagerLoadBalanceMode> 
TASK_MANAGER_LOAD_BALANCE_MODE =
+            ConfigOptions.key("taskmanager.load-balance.mode")
+                    .enumType(TaskManagerLoadBalanceMode.class)
+                    .defaultValue(TaskManagerLoadBalanceMode.NONE)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Mode for the load-balance 
allocation strategy across all available %s. "
+                                                    + "The %s mode tries to 
spread out the slots evenly across all available %s. "
+                                                    + "The %s mode is the 
default mode without any specified strategy. ",
+                                            code("TaskExecutors"),

Review Comment:
   Would you mind updating it from `TaskExecutors` to `TaskManagers`?
   
   TaskExecutor is a class. From user side, we always call it `TaskManagers`.



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerLoadBalanceMode.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
+@Internal
+public enum TaskManagerLoadBalanceMode {
+    NONE,
+    SLOTS;
+
+    /**
+     * The method is mainly to load the {@link 
TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}
+     * from {@link Configuration}, which is compatible with {@link
+     * ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY}.
+     */
+    public static TaskManagerLoadBalanceMode loadFromConfiguration(
+            @Nonnull Configuration configuration) {
+        TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+                
configuration.get(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE);
+        boolean evenlySpreadOutSlots =
+                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+        if (taskManagerLoadBalanceMode != TaskManagerLoadBalanceMode.NONE
+                && taskManagerLoadBalanceMode != null) {
+            return taskManagerLoadBalanceMode;
+        }
+        if (evenlySpreadOutSlots) {
+            return TaskManagerLoadBalanceMode.SLOTS;
+        }

Review Comment:
   > please note we only update `taskmanager.load-balance.mode` to Slots when 
the `taskmanager.load-balance.mode` isn't set and 
`cluster.evenly-spread-out-slots` is true.
   
   As I metioned in FLINK-33348, the current code doesn't respect this rule.
   
   - If user sets `taskmanager.load-balance.mode : None` and 
`cluster.evenly-spread-out-slots : true` manually, we should respect 
`taskmanager.load-balance.mode : None`.
   - If user doesn't set `taskmanager.load-balance.mode : None` and set 
`cluster.evenly-spread-out-slots : true`, we should update 
`taskmanager.load-balance.mode` to `Slots`.
   
   In biref, the default `None` isn't same with user set 
`taskmanager.load-balance.mode` to `None` manually.
   
   How to check the None is default value or from user set? Please check 
`conf.getOptional()` instead of `conf.get()` , and please add a test for these 
2 cases.
   
   Please correct me if my understanding is wrong.



##########
flink-core/src/test/java/org/apache/flink/configuration/TaskManagerLoadBalanceModeTest.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static 
org.apache.flink.configuration.ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY;
+import static 
org.apache.flink.configuration.TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TaskManagerLoadBalanceMode}. */
+@ExtendWith(TestLoggerExtension.class)
+class TaskManagerLoadBalanceModeTest {
+
+    @Test
+    void testReadTaskManagerLoadBalanceMode() {
+        Configuration conf = new Configuration();
+
+        // Check for 'taskmanager.load-balance.mode: NONE' and 
'cluster.evenly-spread-out-slots:
+        // false'
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf))
+                .isEqualTo(TaskManagerLoadBalanceMode.NONE);
+
+        // Check for 'taskmanager.load-balance.mode: NONE' and 
'cluster.evenly-spread-out-slots:
+        // true'
+        conf.setString(EVENLY_SPREAD_OUT_SLOTS_STRATEGY.key(), "true");
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+        // Check for 'taskmanager.load-balance.mode: SLOTS' and 
'cluster.evenly-spread-out-slots:
+        // false'
+        conf.setString(
+                TASK_MANAGER_LOAD_BALANCE_MODE.key(), 
TaskManagerLoadBalanceMode.SLOTS.name());
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);
+
+        // Check for 'taskmanager.load-balance.mode: SLOTS' and 
'cluster.evenly-spread-out-slots:
+        // true'
+        conf.setString(EVENLY_SPREAD_OUT_SLOTS_STRATEGY.key(), "true");
+        assertThat(TaskManagerLoadBalanceMode.loadFromConfiguration(conf))
+                .isEqualTo(TaskManagerLoadBalanceMode.SLOTS);

Review Comment:
   It's better to split these tests to separate code block. The conf is used 
and changed by all cases, it's not clear.
   
   For example, you want to test `taskmanager.load-balance.mode: SLOTS and 
cluster.evenly-spread-out-slots: false` in the third case, you must check what 
the `cluster.evenly-spread-out-slots` is in the second case. It's not clear and 
easy to bug.
   
   
   ```
   // code block1
   {
     Configuration conf = new Configuration();
   }
   
   
   // code block2
   {
     Configuration conf = new Configuration();
   }
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerLoadBalanceMode.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/** Type of {@link TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}. */
+@Internal
+public enum TaskManagerLoadBalanceMode {
+    NONE,
+    SLOTS;
+
+    /**
+     * The method is mainly to load the {@link 
TaskManagerOptions#TASK_MANAGER_LOAD_BALANCE_MODE}
+     * from {@link Configuration}, which is compatible with {@link
+     * ClusterOptions#EVENLY_SPREAD_OUT_SLOTS_STRATEGY}.
+     */
+    public static TaskManagerLoadBalanceMode loadFromConfiguration(
+            @Nonnull Configuration configuration) {
+        TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+                
configuration.get(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE);
+        boolean evenlySpreadOutSlots =
+                
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+        if (taskManagerLoadBalanceMode != TaskManagerLoadBalanceMode.NONE
+                && taskManagerLoadBalanceMode != null) {

Review Comment:
   You wrote a couple of `taskManagerLoadBalanceMode != null` or 
`taskManagerLoadBalanceMode == null`.
   
   IIUC, it never be null due to 
`TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE` has default value.
   
   Please clean up your code and remove uncessary code, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to