This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0e25604 [FLINK-21916] Allows multiple kinds of ManagedMemoryUseCase
for the same operator
0e25604 is described below
commit 0e2560402e5f7c4d24cb9b3b05d79065bb97af6d
Author: Dian Fu <[email protected]>
AuthorDate: Mon Mar 22 18:15:20 2021 +0800
[FLINK-21916] Allows multiple kinds of ManagedMemoryUseCase for the same
operator
This closes #15337.
---
.../util/config/memory/ManagedMemoryUtils.java | 14 ++++++++
.../util/config/memory/ManagedMemoryUtilsTest.java | 39 ++++++++++++++++++++++
.../api/graph/SimpleTransformationTranslator.java | 8 +++--
3 files changed, 58 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
index 3b54a93..850a7fc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
@@ -39,6 +39,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** Utils for configuration and calculations related to managed memory and its
various use cases. */
public enum ManagedMemoryUtils {
;
@@ -155,4 +157,16 @@ public enum ManagedMemoryUtils {
BigDecimal.ROUND_DOWN)
.doubleValue();
}
+
+ public static void validateUseCaseWeightsNotConflict(
+ Map<ManagedMemoryUseCase, Integer> weights1,
+ Map<ManagedMemoryUseCase, Integer> weights2) {
+ weights1.forEach(
+ (useCase, weight1) ->
+ checkState(
+ weights2.getOrDefault(useCase,
weight1).equals(weight1),
+ String.format(
+ "Conflict managed memory consumer
weights for '%s' were configured: '%d' and '%d'.",
+ useCase, weight1,
weights2.get(useCase))));
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
index 31ab8fe..8a05c38 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtilsTest.java
@@ -266,4 +266,43 @@ public class ManagedMemoryUtilsTest extends TestLogger {
assertEquals(expectedStateFractionOfSlot, stateFractionOfSlot, DELTA);
assertEquals(expectedPythonFractionOfSlot, pythonFractionOfSlot,
DELTA);
}
+
+ @Test
+ public void testUseCaseWeightsConfiguredWithConsistentValue() {
+ final Map<ManagedMemoryUseCase, Integer> existingWeights =
+ new HashMap<ManagedMemoryUseCase, Integer>() {
+ {
+ put(ManagedMemoryUseCase.OPERATOR, 123);
+ }
+ };
+
+ final Map<ManagedMemoryUseCase, Integer> newWeights =
+ new HashMap<ManagedMemoryUseCase, Integer>() {
+ {
+ put(ManagedMemoryUseCase.OPERATOR, 123);
+ put(ManagedMemoryUseCase.STATE_BACKEND, 456);
+ }
+ };
+
+ ManagedMemoryUtils.validateUseCaseWeightsNotConflict(existingWeights,
newWeights);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testUseCaseWeightsConfiguredWithConflictValue() {
+ final Map<ManagedMemoryUseCase, Integer> existingWeights =
+ new HashMap<ManagedMemoryUseCase, Integer>() {
+ {
+ put(ManagedMemoryUseCase.OPERATOR, 123);
+ }
+ };
+
+ final Map<ManagedMemoryUseCase, Integer> newWeights =
+ new HashMap<ManagedMemoryUseCase, Integer>() {
+ {
+ put(ManagedMemoryUseCase.OPERATOR, 456);
+ }
+ };
+
+ ManagedMemoryUtils.validateUseCaseWeightsNotConflict(existingWeights,
newWeights);
+ }
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
index 772cf2b..c40c5e9 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/SimpleTransformationTranslator.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.util.graph.StreamGraphUtils;
import java.util.Collection;
+import static
org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils.validateUseCaseWeightsNotConflict;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -116,9 +117,10 @@ public abstract class SimpleTransformationTranslator<OUT,
T extends Transformati
}
final StreamNode streamNode =
streamGraph.getStreamNode(transformationId);
- if (streamNode != null
- &&
streamNode.getManagedMemoryOperatorScopeUseCaseWeights().isEmpty()
- && streamNode.getManagedMemorySlotScopeUseCases().isEmpty()) {
+ if (streamNode != null) {
+ validateUseCaseWeightsNotConflict(
+ streamNode.getManagedMemoryOperatorScopeUseCaseWeights(),
+
transformation.getManagedMemoryOperatorScopeUseCaseWeights());
streamNode.setManagedMemoryUseCaseWeights(
transformation.getManagedMemoryOperatorScopeUseCaseWeights(),
transformation.getManagedMemorySlotScopeUseCases());