This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e25604  [FLINK-21916] Allows multiple kinds of ManagedMemoryUseCase 
for the same operator
0e25604 is described below

commit 0e2560402e5f7c4d24cb9b3b05d79065bb97af6d
Author: Dian Fu <[email protected]>
AuthorDate: Mon Mar 22 18:15:20 2021 +0800

    [FLINK-21916] Allows multiple kinds of ManagedMemoryUseCase for the same 
operator
    
    This closes #15337.
---
 .../util/config/memory/ManagedMemoryUtils.java     | 14 ++++++++
 .../util/config/memory/ManagedMemoryUtilsTest.java | 39 ++++++++++++++++++++++
 .../api/graph/SimpleTransformationTranslator.java  |  8 +++--
 3 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
index 3b54a93..850a7fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
@@ -39,6 +39,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** Utils for configuration and calculations related to managed memory and its 
various use cases. */
 public enum ManagedMemoryUtils {
     ;
@@ -155,4 +157,16 @@ public enum ManagedMemoryUtils {
                         BigDecimal.ROUND_DOWN)
                 .doubleValue();
     }
+
+    public static void validateUseCaseWeightsNotConflict(
+            Map<ManagedMemoryUseCase, Integer> weights1,
+            Map<ManagedMemoryUseCase, Integer> weights2) {
+        weights1.forEach(
+                (useCase, weight1) ->
+                        checkState(
+                                weights2.getOrDefault(useCase, 
weight1).equals(weight1),
+                                String.format(
+                                        "Conflict managed memory consumer 
weights for '%s' were configured: '%d' and '%d'.",
+                                        useCase, weight1, 
weights2.get(useCase))));
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
index 31ab8fe..8a05c38 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
@@ -266,4 +266,43 @@ public class ManagedMemoryUtilsTest extends TestLogger {
         assertEquals(expectedStateFractionOfSlot, stateFractionOfSlot, DELTA);
         assertEquals(expectedPythonFractionOfSlot, pythonFractionOfSlot, 
DELTA);
     }
+
+    @Test
+    public void testUseCaseWeightsConfiguredWithConsistentValue() {
+        final Map<ManagedMemoryUseCase, Integer> existingWeights =
+                new HashMap<ManagedMemoryUseCase, Integer>() {
+                    {
+                        put(ManagedMemoryUseCase.OPERATOR, 123);
+                    }
+                };
+
+        final Map<ManagedMemoryUseCase, Integer> newWeights =
+                new HashMap<ManagedMemoryUseCase, Integer>() {
+                    {
+                        put(ManagedMemoryUseCase.OPERATOR, 123);
+                        put(ManagedMemoryUseCase.STATE_BACKEND, 456);
+                    }
+                };
+
+        ManagedMemoryUtils.validateUseCaseWeightsNotConflict(existingWeights, 
newWeights);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testUseCaseWeightsConfiguredWithConflictValue() {
+        final Map<ManagedMemoryUseCase, Integer> existingWeights =
+                new HashMap<ManagedMemoryUseCase, Integer>() {
+                    {
+                        put(ManagedMemoryUseCase.OPERATOR, 123);
+                    }
+                };
+
+        final Map<ManagedMemoryUseCase, Integer> newWeights =
+                new HashMap<ManagedMemoryUseCase, Integer>() {
+                    {
+                        put(ManagedMemoryUseCase.OPERATOR, 456);
+                    }
+                };
+
+        ManagedMemoryUtils.validateUseCaseWeightsNotConflict(existingWeights, 
newWeights);
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
index 772cf2b..c40c5e9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.util.graph.StreamGraphUtils;
 
 import java.util.Collection;
 
+import static 
org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils.validateUseCaseWeightsNotConflict;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -116,9 +117,10 @@ public abstract class SimpleTransformationTranslator<OUT, 
T extends Transformati
         }
 
         final StreamNode streamNode = 
streamGraph.getStreamNode(transformationId);
-        if (streamNode != null
-                && 
streamNode.getManagedMemoryOperatorScopeUseCaseWeights().isEmpty()
-                && streamNode.getManagedMemorySlotScopeUseCases().isEmpty()) {
+        if (streamNode != null) {
+            validateUseCaseWeightsNotConflict(
+                    streamNode.getManagedMemoryOperatorScopeUseCaseWeights(),
+                    
transformation.getManagedMemoryOperatorScopeUseCaseWeights());
             streamNode.setManagedMemoryUseCaseWeights(
                     
transformation.getManagedMemoryOperatorScopeUseCaseWeights(),
                     transformation.getManagedMemorySlotScopeUseCases());

Reply via email to