KarmaGYZ commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489294355



##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -139,12 +142,12 @@ public static int getNewNodeId() {
        private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
        /**
-        * This weight indicates how much this transformation relies on managed 
memory, so that
-        * transformation highly relies on managed memory would be able to 
acquire more managed
-        * memory in runtime (linear association). Note that it only works in 
cases of UNKNOWN
-        * resources.
+        * Each entry in this map represents a use case that this 
transformation needs managed memory for. The key of the
+        * entry indicates the use case, while the value is the 
use-case-specific weight for this transformation. Managed
+        * memory reserved for an OPERATOR scope use case will be shared by all 
the declaring transformations within a slot
+        * according to this weight. For SLOT scope use cases, the weights are 
ignored.
         */
-       private int managedMemoryWeight = DEFAULT_MANAGED_MEMORY_WEIGHT;
+       private final Map<ManagedMemoryUseCase, Integer> 
managedMemoryUseCaseWeights = new HashMap<>();

Review comment:
       Just trying to understand, is it illegal to put `ROCKSDB` or `PYTHON` in 
this map in your design? If so, should we check the sanity if someone tries to 
do it?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -849,26 +843,23 @@ private static void 
setManagedMemoryFractionForSlotSharingGroup(
        }
 
        private static void setManagedMemoryFractionForOperator(
-                       final ResourceSpec operatorResourceSpec,
                        final ResourceSpec groupResourceSpec,
                        final int operatorManagedMemoryWeight,
                        final int groupManagedMemoryWeight,
                        final StreamConfig operatorConfig) {
 
-               final double managedMemoryFraction;
-
                if (groupResourceSpec.equals(ResourceSpec.UNKNOWN)) {
-                       managedMemoryFraction = groupManagedMemoryWeight > 0
-                               ? 
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight)
-                               : 0.0;
+                       operatorConfig.setManagedMemoryFraction(
+                                       groupManagedMemoryWeight > 0 ?
+                                                       
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) :
+                                                       0.0);
                } else {
-                       final long groupManagedMemoryBytes = 
groupResourceSpec.getManagedMemory().getBytes();
-                       managedMemoryFraction = groupManagedMemoryBytes > 0
-                               ? 
getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(), 
groupManagedMemoryBytes)
-                               : 0.0;
+                       // Supporting for fine grained resource specs is still 
under developing.
+                       // This branch should not be executed in production. 
Not throwing exception for testing purpose.
+                       LOG.error("Failed setting managed memory fractions. " +
+                                       " Operators may not be able to use 
managed memory properly." +
+                                       " Calculating managed memory fractions 
with fine grained resource spec is currently not supported.");

Review comment:
       I agree that the solution for supporting fine-grained resource specs 
should not block this work. However, we might create a ticket to track that 
work and mention it in the comment. We could also add the `TODO` label to the 
comment.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -849,26 +843,23 @@ private static void 
setManagedMemoryFractionForSlotSharingGroup(
        }
 
        private static void setManagedMemoryFractionForOperator(
-                       final ResourceSpec operatorResourceSpec,
                        final ResourceSpec groupResourceSpec,
                        final int operatorManagedMemoryWeight,
                        final int groupManagedMemoryWeight,
                        final StreamConfig operatorConfig) {
 
-               final double managedMemoryFraction;
-
                if (groupResourceSpec.equals(ResourceSpec.UNKNOWN)) {
-                       managedMemoryFraction = groupManagedMemoryWeight > 0
-                               ? 
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight)
-                               : 0.0;
+                       operatorConfig.setManagedMemoryFraction(
+                                       groupManagedMemoryWeight > 0 ?
+                                                       
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) :
+                                                       0.0);
                } else {
-                       final long groupManagedMemoryBytes = 
groupResourceSpec.getManagedMemory().getBytes();
-                       managedMemoryFraction = groupManagedMemoryBytes > 0
-                               ? 
getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(), 
groupManagedMemoryBytes)
-                               : 0.0;
+                       // Supporting for fine grained resource specs is still 
under developing.
+                       // This branch should not be executed in production. 
Not throwing exception for testing purpose.
+                       LOG.error("Failed setting managed memory fractions. " +
+                                       " Operators may not be able to use 
managed memory properly." +
+                                       " Calculating managed memory fractions 
with fine grained resource spec is currently not supported.");

Review comment:
       I know ATM we will never go into this branch by design, but does it make 
sense to throw an exception here?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##########
@@ -401,6 +403,22 @@
                        .withDescription("Fraction of Total Flink Memory to be 
used as Managed Memory, if Managed Memory size is not"
                                + " explicitly specified.");
 
+       /**
+        * Weights of managed memory consumers.
+        */
+       // Do not advertise this option until the feature is completed.
+       @Documentation.ExcludeFromDocumentation
+       public static final ConfigOption<Map<String, String>> 
MANAGED_MEMORY_CONSUMER_WEIGHTS =
+               key("taskmanager.memory.managed.consumer-weights")
+                       .mapType()
+                       .defaultValue(new HashMap<String, String>() {{
+                               put("DATAPROC", "70");
+                               put("PYTHON", "30");

Review comment:
       It would be good to have an `enum` which holds all supported kinds.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -49,6 +48,8 @@
 @Internal
 public class StreamNode implements Serializable {
 
+       public static final int DEFAULT_MANAGED_MEMORY_WEIGHT = 1;

Review comment:
       Could be package-private.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -177,11 +178,13 @@ private JobGraph createJobGraph() {
 
                setSlotSharingAndCoLocation();
 
+               // For now, only consider managed memory for batch algorithms.
+               // TODO: extend managed memory fraction calculations w.r.t. 
various managed memory use cases.
                setManagedMemoryFraction(
                        Collections.unmodifiableMap(jobVertices),
                        Collections.unmodifiableMap(vertexConfigs),
                        Collections.unmodifiableMap(chainedConfigs),
-                       id -> 
streamGraph.getStreamNode(id).getManagedMemoryWeight());
+                       id -> 
streamGraph.getStreamNode(id).getManagedMemoryUseCaseWeights().getOrDefault(ManagedMemoryUseCase.BATCH_OP,
 0));

Review comment:
       We probably need a static variable for the default weight.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
##########
@@ -557,10 +557,11 @@ public void testSetManagedMemoryWeight() {
 
                final StreamGraph streamGraph = env.getStreamGraph();
                for (StreamNode streamNode : streamGraph.getStreamNodes()) {
-                       final int expectedWeight = 
streamNode.getOperatorName().contains("source")
-                               ? 123
-                               : StreamNode.DEFAULT_MANAGED_MEMORY_WEIGHT;
-                       assertEquals(expectedWeight, 
streamNode.getManagedMemoryWeight());
+                       if (streamNode.getOperatorName().contains("source")) {
+                               
assertThat(streamNode.getManagedMemoryUseCaseWeights().get(ManagedMemoryUseCase.BATCH_OP),
 is(123));
+                       } else {
+                               
assertThat(streamNode.getManagedMemoryUseCaseWeights().size(), is(0));

Review comment:
       Then, the expected value should be a reference instead of a specific 
value.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to