xintongsong commented on a change in pull request #15337:
URL: https://github.com/apache/flink/pull/15337#discussion_r599389303



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
##########
@@ -155,4 +155,19 @@ public static double getFractionRoundedDown(final long 
dividend, final long divi
                         BigDecimal.ROUND_DOWN)
                 .doubleValue();
     }
+
+    public static void validateManagedMemoryUseCaseWeights(
+            Map<ManagedMemoryUseCase, Integer> 
existingOperatorScopeUseCaseWeights,
+            Map<ManagedMemoryUseCase, Integer> newOperatorScopeUseCaseWeights) 
{

Review comment:
       It's a bit unclear from the method name what does the method validate.
   I'd suggest `validateUseCaseWeightsNoConflict(weights1, weights2)`.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -209,6 +210,8 @@ public void setResources(ResourceSpec minResources, 
ResourceSpec preferredResour
     public void setManagedMemoryUseCaseWeights(
             Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights,
             Set<ManagedMemoryUseCase> slotScopeUseCases) {
+        validateManagedMemoryUseCaseWeights(
+                managedMemoryOperatorScopeUseCaseWeights, 
operatorScopeUseCaseWeights);

Review comment:
       I think the requirement that no existing weights should be overwritten 
does not come from `StreamNode`, but from `TransformationTranslator`. 
Therefore, I'd suggest to enforce this validation in 
`SimpleTransformationTranslator` rather than here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ManagedMemoryUtils.java
##########
@@ -155,4 +155,19 @@ public static double getFractionRoundedDown(final long 
dividend, final long divi
                         BigDecimal.ROUND_DOWN)
                 .doubleValue();
     }
+
+    public static void validateManagedMemoryUseCaseWeights(
+            Map<ManagedMemoryUseCase, Integer> 
existingOperatorScopeUseCaseWeights,
+            Map<ManagedMemoryUseCase, Integer> newOperatorScopeUseCaseWeights) 
{
+        for (Map.Entry<ManagedMemoryUseCase, Integer> entry :
+                newOperatorScopeUseCaseWeights.entrySet()) {
+            Integer existingWeight = 
existingOperatorScopeUseCaseWeights.get(entry.getKey());
+            if (existingWeight != null && 
!existingWeight.equals(entry.getValue())) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "The new value '%d' mismatch with the existing 
value '%d' for managed memory consumer weight '%s'.",
+                                entry.getValue(), existingWeight, 
entry.getKey()));
+            }
+        }

Review comment:
       The validation can be simplified as follows.
   ```
   weights1.forEach(
           (useCase, weight) ->
                   checkState(
                           weights2.getOrDefault(useCase, 
weight).equals(weight), "err msg"));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to