This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed354d9c93b38d843269c29b291271ba8400c7d9
Author: Xintong Song <[email protected]>
AuthorDate: Thu Jan 7 14:30:14 2021 +0800

    [FLINK-20860][core] Update valid names for 
TaskManagerOptions#MANAGED_MEMORY_CONSUMER_WEIGHTS.
    
    This closes #14576
---
 .../_includes/generated/common_memory_section.html |   4 +-
 .../task_manager_memory_configuration.html         |   4 +-
 .../flink/configuration/TaskManagerOptions.java    |  27 +++--
 .../util/config/memory/ManagedMemoryUtils.java     | 109 ++++++++++++---------
 .../util/config/memory/ManagedMemoryUtilsTest.java |  86 +++++++++++++---
 5 files changed, 154 insertions(+), 76 deletions(-)

diff --git a/docs/_includes/generated/common_memory_section.html 
b/docs/_includes/generated/common_memory_section.html
index c2e0160..f4406d5 100644
--- a/docs/_includes/generated/common_memory_section.html
+++ b/docs/_includes/generated/common_memory_section.html
@@ -106,9 +106,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.consumer-weights</h5></td>
-            <td style="word-wrap: break-word;">DATAPROC:70,PYTHON:30</td>
+            <td style="word-wrap: 
break-word;">OPERATOR:70,STATE_BACKEND:70,PYTHON:30</td>
             <td>Map</td>
-            <td>Managed memory weights for different kinds of consumers. A 
slot’s managed memory is shared by all kinds of consumers it contains, 
proportionally to the kinds’ weights and regardless of the number of consumers 
from each kind. Currently supported kinds of consumers are DATAPROC (for 
RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON 
(for Python processes).</td>
+            <td>Managed memory weights for different kinds of consumers. A 
slot’s managed memory is shared by all kinds of consumers it contains, 
proportionally to the kinds’ weights and regardless of the number of consumers 
from each kind. Currently supported kinds of consumers are OPERATOR (for 
built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for 
Python processes).</td>
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.fraction</h5></td>
diff --git a/docs/_includes/generated/task_manager_memory_configuration.html 
b/docs/_includes/generated/task_manager_memory_configuration.html
index d23b267..62fa930 100644
--- a/docs/_includes/generated/task_manager_memory_configuration.html
+++ b/docs/_includes/generated/task_manager_memory_configuration.html
@@ -52,9 +52,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.consumer-weights</h5></td>
-            <td style="word-wrap: break-word;">DATAPROC:70,PYTHON:30</td>
+            <td style="word-wrap: 
break-word;">OPERATOR:70,STATE_BACKEND:70,PYTHON:30</td>
             <td>Map</td>
-            <td>Managed memory weights for different kinds of consumers. A 
slot’s managed memory is shared by all kinds of consumers it contains, 
proportionally to the kinds’ weights and regardless of the number of consumers 
from each kind. Currently supported kinds of consumers are DATAPROC (for 
RocksDB state backend in streaming and built-in algorithms in batch) and PYTHON 
(for Python processes).</td>
+            <td>Managed memory weights for different kinds of consumers. A 
slot’s managed memory is shared by all kinds of consumers it contains, 
proportionally to the kinds’ weights and regardless of the number of consumers 
from each kind. Currently supported kinds of consumers are OPERATOR (for 
built-in algorithms), STATE_BACKEND (for RocksDB state backend) and PYTHON (for 
Python processes).</td>
         </tr>
         <tr>
             <td><h5>taskmanager.memory.managed.fraction</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 018413f..ee1bef3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -37,7 +37,14 @@ import static 
org.apache.flink.configuration.description.TextElement.text;
 @ConfigGroups(groups = @ConfigGroup(name = "TaskManagerMemory", keyPrefix = 
"taskmanager.memory"))
 public class TaskManagerOptions {
 
-    public static final String MANAGED_MEMORY_CONSUMER_NAME_DATAPROC = 
"DATAPROC";
+    /**
+     * @deprecated use {@link #MANAGED_MEMORY_CONSUMER_NAME_OPERATOR} and 
{@link
+     *     #MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND} instead
+     */
+    @Deprecated public static final String 
MANAGED_MEMORY_CONSUMER_NAME_DATAPROC = "DATAPROC";
+
+    public static final String MANAGED_MEMORY_CONSUMER_NAME_OPERATOR = 
"OPERATOR";
+    public static final String MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND = 
"STATE_BACKEND";
     public static final String MANAGED_MEMORY_CONSUMER_NAME_PYTHON = "PYTHON";
 
     // ------------------------------------------------------------------------
@@ -433,17 +440,21 @@ public class TaskManagerOptions {
                     .defaultValue(
                             new HashMap<String, String>() {
                                 {
-                                    put(MANAGED_MEMORY_CONSUMER_NAME_DATAPROC, 
"70");
+                                    put(MANAGED_MEMORY_CONSUMER_NAME_OPERATOR, 
"70");
+                                    
put(MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND, "70");
                                     put(MANAGED_MEMORY_CONSUMER_NAME_PYTHON, 
"30");
                                 }
                             })
                     .withDescription(
-                            "Managed memory weights for different kinds of 
consumers. A slot’s managed memory is"
-                                    + " shared by all kinds of consumers it 
contains, proportionally to the kinds’ weights and regardless"
-                                    + " of the number of consumers from each 
kind. Currently supported kinds of consumers are "
-                                    + MANAGED_MEMORY_CONSUMER_NAME_DATAPROC
-                                    + " (for RocksDB state backend in 
streaming and built-in"
-                                    + " algorithms in batch) and "
+                            "Managed memory weights for different kinds of 
consumers. A slot’s"
+                                    + " managed memory is shared by all kinds 
of consumers it"
+                                    + " contains, proportionally to the kinds’ 
weights and"
+                                    + " regardless of the number of consumers 
from each kind."
+                                    + " Currently supported kinds of consumers 
are "
+                                    + MANAGED_MEMORY_CONSUMER_NAME_OPERATOR
+                                    + " (for built-in algorithms), "
+                                    + 
MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND
+                                    + " (for RocksDB state backend) and "
                                     + MANAGED_MEMORY_CONSUMER_NAME_PYTHON
                                     + " (for Python processes).");
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
index 8fe0e03..3b54a93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
@@ -19,22 +19,25 @@
 package org.apache.flink.runtime.util.config.memory;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.state.StateBackendLoader;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /** Utils for configuration and calculations related to managed memory and its 
various use cases. */
 public enum ManagedMemoryUtils {
@@ -44,11 +47,20 @@ public enum ManagedMemoryUtils {
 
     private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;
 
-    /** Valid names of managed memory consumers. */
-    private static final String[] MANAGED_MEMORY_CONSUMER_VALID_NAMES = {
-        TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
-        TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON
-    };
+    /** Names of managed memory use cases, in the fallback order. */
+    @SuppressWarnings("deprecation")
+    private static final Map<ManagedMemoryUseCase, List<String>> 
USE_CASE_CONSUMER_NAMES =
+            ImmutableMap.of(
+                    ManagedMemoryUseCase.OPERATOR,
+                    ImmutableList.of(
+                            
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
+                            
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC),
+                    ManagedMemoryUseCase.STATE_BACKEND,
+                    ImmutableList.of(
+                            
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND,
+                            
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC),
+                    ManagedMemoryUseCase.PYTHON,
+                    
ImmutableList.of(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON));
 
     public static double convertToFractionOfSlot(
             ManagedMemoryUseCase useCase,
@@ -86,52 +98,53 @@ public enum ManagedMemoryUtils {
     @VisibleForTesting
     static Map<ManagedMemoryUseCase, Integer> 
getManagedMemoryUseCaseWeightsFromConfig(
             Configuration config) {
-        final Map<String, String> consumerWeights =
+        final Map<String, String> configuredWeights =
                 config.get(TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS);
+        final Map<ManagedMemoryUseCase, Integer> effectiveWeights = new 
HashMap<>();
+
+        for (Map.Entry<ManagedMemoryUseCase, List<String>> entry :
+                USE_CASE_CONSUMER_NAMES.entrySet()) {
+            final ManagedMemoryUseCase useCase = entry.getKey();
+            final Iterator<String> nameIter = entry.getValue().iterator();
+
+            boolean findWeight = false;
+            while (!findWeight && nameIter.hasNext()) {
+                final String name = nameIter.next();
+                final String weightStr = configuredWeights.get(name);
+                if (weightStr != null) {
+                    final int weight = Integer.parseInt(weightStr);
+                    findWeight = true;
+
+                    if (weight < 0) {
+                        throw new IllegalConfigurationException(
+                                String.format(
+                                        "Managed memory weight should not be 
negative. Configured "
+                                                + "weight for %s is %d.",
+                                        useCase, weight));
+                    }
+
+                    if (weight == 0) {
+                        LOG.debug(
+                                "Managed memory consumer weight for {} is 
configured to 0. Jobs "
+                                        + "containing this type of managed 
memory consumers may "
+                                        + "fail due to not being able to 
allocate managed memory.",
+                                useCase);
+                    }
+
+                    effectiveWeights.put(useCase, weight);
+                }
+            }
 
-        for (String consumer : MANAGED_MEMORY_CONSUMER_VALID_NAMES) {
-            if (!consumerWeights.containsKey(consumer)) {
+            if (!findWeight) {
                 LOG.debug(
-                        "Managed memory consumer weight for {} is not 
configured. Jobs containing this type of "
-                                + "managed memory consumers may fail due to 
not being able to allocate managed memory.",
-                        consumer);
+                        "Managed memory consumer weight for {} is not 
configured. Jobs containing "
+                                + "this type of managed memory consumers may 
fail due to not being "
+                                + "able to allocate managed memory.",
+                        useCase);
             }
         }
 
-        return consumerWeights.entrySet().stream()
-                .flatMap(
-                        (entry) -> {
-                            final String consumer = entry.getKey();
-                            final int weight = 
Integer.parseInt(entry.getValue());
-
-                            if (weight < 0) {
-                                throw new IllegalConfigurationException(
-                                        String.format(
-                                                "Managed memory weight should 
not be negative. Configured weight for %s is %d.",
-                                                consumer, weight));
-                            }
-
-                            if (weight == 0) {
-                                LOG.debug(
-                                        "Managed memory consumer weight for {} 
is configured to 0. Jobs containing this type of "
-                                                + "managed memory consumers 
may fail due to not being able to allocate managed memory.",
-                                        consumer);
-                            }
-
-                            switch (consumer) {
-                                case 
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC:
-                                    return Stream.of(
-                                            
Tuple2.of(ManagedMemoryUseCase.OPERATOR, weight),
-                                            
Tuple2.of(ManagedMemoryUseCase.STATE_BACKEND, weight));
-                                case 
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_PYTHON:
-                                    return Stream.of(
-                                            
Tuple2.of(ManagedMemoryUseCase.PYTHON, weight));
-                                default:
-                                    throw new IllegalConfigurationException(
-                                            "Unknown managed memory consumer: 
" + consumer);
-                            }
-                        })
-                .collect(Collectors.toMap((tuple2) -> tuple2.f0, (tuple2) -> 
tuple2.f1));
+        return effectiveWeights;
     }
 
     public static double getFractionRoundedDown(final long dividend, final 
long divisor) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
index 650a1c7..31ab8fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
@@ -45,6 +45,9 @@ public class ManagedMemoryUtilsTest extends TestLogger {
 
     private static final int DATA_PROC_WEIGHT = 111;
     private static final int PYTHON_WEIGHT = 222;
+    private static final int OPERATOR_WEIGHT = 333;
+    private static final int STATE_BACKEND_WEIGHT = 444;
+    private static final int TOTAL_WEIGHT = PYTHON_WEIGHT + OPERATOR_WEIGHT + 
STATE_BACKEND_WEIGHT;
 
     private static final UnmodifiableConfiguration CONFIG_WITH_ALL_USE_CASES =
             new UnmodifiableConfiguration(
@@ -62,6 +65,35 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                                                     TaskManagerOptions
                                                             
.MANAGED_MEMORY_CONSUMER_NAME_PYTHON,
                                                     
String.valueOf(PYTHON_WEIGHT));
+                                            put(
+                                                    TaskManagerOptions
+                                                            
.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
+                                                    
String.valueOf(OPERATOR_WEIGHT));
+                                            put(
+                                                    TaskManagerOptions
+                                                            
.MANAGED_MEMORY_CONSUMER_NAME_STATE_BACKEND,
+                                                    
String.valueOf(STATE_BACKEND_WEIGHT));
+                                        }
+                                    });
+                        }
+                    });
+
+    private static final UnmodifiableConfiguration 
CONFIG_WITH_LEGACY_USE_CASES =
+            new UnmodifiableConfiguration(
+                    new Configuration() {
+                        {
+                            set(
+                                    
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
+                                    new HashMap<String, String>() {
+                                        {
+                                            put(
+                                                    TaskManagerOptions
+                                                            
.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
+                                                    
String.valueOf(DATA_PROC_WEIGHT));
+                                            put(
+                                                    TaskManagerOptions
+                                                            
.MANAGED_MEMORY_CONSUMER_NAME_PYTHON,
+                                                    
String.valueOf(PYTHON_WEIGHT));
                                         }
                                     });
                         }
@@ -72,8 +104,8 @@ public class ManagedMemoryUtilsTest extends TestLogger {
         final Map<ManagedMemoryUseCase, Integer> expectedWeights =
                 new HashMap<ManagedMemoryUseCase, Integer>() {
                     {
-                        put(ManagedMemoryUseCase.STATE_BACKEND, 
DATA_PROC_WEIGHT);
-                        put(ManagedMemoryUseCase.OPERATOR, DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.OPERATOR, OPERATOR_WEIGHT);
+                        put(ManagedMemoryUseCase.STATE_BACKEND, 
STATE_BACKEND_WEIGHT);
                         put(ManagedMemoryUseCase.PYTHON, PYTHON_WEIGHT);
                     }
                 };
@@ -85,18 +117,22 @@ public class ManagedMemoryUtilsTest extends TestLogger {
         assertThat(configuredWeights, is(expectedWeights));
     }
 
-    @Test(expected = IllegalConfigurationException.class)
-    public void testGetWeightsFromConfigFailUnknownUseCase() {
-        final Configuration config =
-                new Configuration() {
+    @Test
+    public void testGetWeightsFromConfigLegacy() {
+        final Map<ManagedMemoryUseCase, Integer> expectedWeights =
+                new HashMap<ManagedMemoryUseCase, Integer>() {
                     {
-                        set(
-                                
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
-                                Collections.singletonMap("UNKNOWN_KEY", 
"123"));
+                        put(ManagedMemoryUseCase.OPERATOR, DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.STATE_BACKEND, 
DATA_PROC_WEIGHT);
+                        put(ManagedMemoryUseCase.PYTHON, PYTHON_WEIGHT);
                     }
                 };
 
-        ManagedMemoryUtils.getManagedMemoryUseCaseWeightsFromConfig(config);
+        final Map<ManagedMemoryUseCase, Integer> configuredWeights =
+                ManagedMemoryUtils.getManagedMemoryUseCaseWeightsFromConfig(
+                        CONFIG_WITH_LEGACY_USE_CASES);
+
+        assertThat(configuredWeights, is(expectedWeights));
     }
 
     @Test(expected = IllegalConfigurationException.class)
@@ -107,7 +143,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         set(
                                 
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
                                 Collections.singletonMap(
-                                        
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_DATAPROC,
+                                        
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
                                         "-123"));
                     }
                 };
@@ -127,14 +163,15 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         new HashSet<ManagedMemoryUseCase>() {
                             {
                                 add(ManagedMemoryUseCase.OPERATOR);
+                                add(ManagedMemoryUseCase.STATE_BACKEND);
                                 add(ManagedMemoryUseCase.PYTHON);
                             }
                         },
                         CONFIG_WITH_ALL_USE_CASES,
-                        Optional.empty(),
+                        Optional.of(true),
                         ClassLoader.getSystemClassLoader());
 
-        assertEquals(fractionOfUseCase / 3, fractionOfSlot, DELTA);
+        assertEquals(fractionOfUseCase * OPERATOR_WEIGHT / TOTAL_WEIGHT, 
fractionOfSlot, DELTA);
     }
 
     @Test
@@ -158,11 +195,12 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         new HashSet<ManagedMemoryUseCase>() {
                             {
                                 add(ManagedMemoryUseCase.OPERATOR);
+                                add(ManagedMemoryUseCase.STATE_BACKEND);
                                 add(ManagedMemoryUseCase.PYTHON);
                             }
                         },
                         config,
-                        Optional.empty(),
+                        Optional.of(true),
                         ClassLoader.getSystemClassLoader());
 
         assertEquals(0.0, fractionOfSlot, DELTA);
@@ -171,27 +209,42 @@ public class ManagedMemoryUtilsTest extends TestLogger {
     @Test
     public void testConvertToFractionOfSlotStateBackendUseManagedMemory() {
         testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(
-                true, 1.0 / 3, 1.0 * 2 / 3);
+                true,
+                1.0 * OPERATOR_WEIGHT / TOTAL_WEIGHT,
+                1.0 * STATE_BACKEND_WEIGHT / TOTAL_WEIGHT,
+                1.0 * PYTHON_WEIGHT / TOTAL_WEIGHT);
     }
 
     @Test
     public void testConvertToFractionOfSlotStateBackendNotUserManagedMemory() {
-        
testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(false, 
0.0, 1.0);
+        final int totalWeight = OPERATOR_WEIGHT + PYTHON_WEIGHT;
+        testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(
+                false, 1.0 * OPERATOR_WEIGHT / totalWeight, 0.0, 1.0 * 
PYTHON_WEIGHT / totalWeight);
     }
 
     private void 
testConvertToFractionOfSlotGivenWhetherStateBackendUsesManagedMemory(
             boolean stateBackendUsesManagedMemory,
+            double expectedOperatorFractionOfSlot,
             double expectedStateFractionOfSlot,
             double expectedPythonFractionOfSlot) {
 
         final Set<ManagedMemoryUseCase> allUseCases =
                 new HashSet<ManagedMemoryUseCase>() {
                     {
+                        add(ManagedMemoryUseCase.OPERATOR);
                         add(ManagedMemoryUseCase.STATE_BACKEND);
                         add(ManagedMemoryUseCase.PYTHON);
                     }
                 };
 
+        final double opFractionOfSlot =
+                ManagedMemoryUtils.convertToFractionOfSlot(
+                        ManagedMemoryUseCase.OPERATOR,
+                        1.0,
+                        allUseCases,
+                        CONFIG_WITH_ALL_USE_CASES,
+                        Optional.of(stateBackendUsesManagedMemory),
+                        ClassLoader.getSystemClassLoader());
         final double stateFractionOfSlot =
                 ManagedMemoryUtils.convertToFractionOfSlot(
                         ManagedMemoryUseCase.STATE_BACKEND,
@@ -209,6 +262,7 @@ public class ManagedMemoryUtilsTest extends TestLogger {
                         Optional.of(stateBackendUsesManagedMemory),
                         ClassLoader.getSystemClassLoader());
 
+        assertEquals(expectedOperatorFractionOfSlot, opFractionOfSlot, DELTA);
         assertEquals(expectedStateFractionOfSlot, stateFractionOfSlot, DELTA);
         assertEquals(expectedPythonFractionOfSlot, pythonFractionOfSlot, 
DELTA);
     }

Reply via email to