xintongsong commented on a change in pull request #16771:
URL: https://github.com/apache/flink/pull/16771#discussion_r686442349



##########
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##########
@@ -273,7 +273,8 @@ public ResourceSpec getPreferredResources() {
      * @param managedMemoryUseCase The use case that this transformation 
declares needing managed
      *     memory for.
      * @param weight Use-case-specific weights for this transformation. Used 
for sharing managed
-     *     memory across transformations for OPERATOR scope use cases.
+     *     memory across transformations for OPERATOR scope use cases. For 
consistency, the APIs
+     *     declare their weights as a kibibyte value.

Review comment:
       I'm not entirely sure that 1kb is a good unit for weights in all the use 
cases. E.g., our internal Gemini state backend also uses managed memory in a 
per-operator bias, which sets the weight to be number of states the operator 
maintains.
   
   Not saying we should change things for a special internal use case. 
Admittedly, having a consistent unit for all use cases is not causing problems 
at the moment, because there's currently only one operator scope managed memory 
use case. But thinking of future flexibility, maybe we should avoid having 
unnecessarily strong assumptions. In this case, what we really need is a 
consistent unit of weight for one specific managed memory use case, rather than 
all use cases.
   
   In particular, I'd suggest to add the 1kb definition of weight to the 
JavaDoc of `ManagedMemoryUseCase#OPERATOR`, and add a pointer here to remind 
the callers to check the weight definition of the declared use case.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/BatchExecutionUtils.java
##########
@@ -61,12 +62,23 @@ static void applyBatchExecutionSettings(
                 node.addInputRequirement(i, inputRequirements[i]);
             }
             Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = 
new HashMap<>();
-            operatorScopeUseCaseWeights.put(ManagedMemoryUseCase.OPERATOR, 1);
+            operatorScopeUseCaseWeights.put(
+                    ManagedMemoryUseCase.OPERATOR,
+                    deriveMemoryWeight(context.getGraphGeneratorConfig()));
             node.setManagedMemoryUseCaseWeights(
                     operatorScopeUseCaseWeights, Collections.emptySet());
         }
     }
 
+    private static int deriveMemoryWeight(ReadableConfig configuration) {
+        long memoryBytes = 
configuration.get(ExecutionOptions.SORTED_INPUTS_MEMORY).getBytes();
+        if (memoryBytes <= 0) {
+            memoryBytes = 
ExecutionOptions.SORTED_INPUTS_MEMORY.defaultValue().getBytes();
+        }
+        // convert to kibibytes
+        return (int) Math.max(1, memoryBytes >> 10);

Review comment:
       I think this can be simplified as follow.
   - Memory type config option does not accept negative values.
   - If not specified, it automatically falls back to the default value.
   - You can get the KB value directly from a `MemorySize`.
   ```suggestion
           return (int)
                   Math.max(
                           1, 
configuration.get(ExecutionOptions.SORTED_INPUTS_MEMORY).getKibiBytes());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to