1996fanrui commented on code in PR #751:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/751#discussion_r1448948315


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure(
         return false;
     }
 
+    private boolean scalingWouldExceedClusterResources(
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            Context ctx) {
+
+        final double taskManagerCpu = ctx.getTaskManagerCpu();
+        final double taskManagerMemory = ctx.getTaskManagerMemory();
+
+        if (taskManagerCpu <= 0 || taskManagerMemory <= 0) {
+            // We can't extract the requirements, we can't make any assumptions
+            return false;
+        }
+
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS)
+                && 
globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS)
+                && 
globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) {
+            LOG.info("JM metrics not ready yet");
+            return true;
+        }
+
+        var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+        int oldParallelismSum =
+                vertexMetrics.values().stream()
+                        .map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
+                        .reduce(0, Integer::sum);
+
+        Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+        for (Map.Entry<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>> entry :
+                vertexMetrics.entrySet()) {
+            JobVertexID jobVertexID = entry.getKey();
+            ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+            if (scalingSummary != null) {
+                newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
+            } else {
+                newParallelisms.put(
+                        jobVertexID,
+                        (int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+            }
+        }
+
+        double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+        final int numTaskSlotsAfterRescale;
+        if (oldParallelismSum == numTaskSlotsUsed) {
+            // Slot sharing activated
+            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::sum);

Review Comment:
   It could happen when resource is not enough and using adaptive scheduler, 
right?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java:
##########
@@ -247,6 +254,69 @@ private boolean isJobUnderMemoryPressure(
         return false;
     }
 
+    private boolean scalingWouldExceedClusterResources(
+            EvaluatedMetrics evaluatedMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            Context ctx) {
+
+        final double taskManagerCpu = ctx.getTaskManagerCpu();
+        final double taskManagerMemory = ctx.getTaskManagerMemory();
+
+        if (taskManagerCpu <= 0 || taskManagerMemory <= 0) {
+            // We can't extract the requirements, we can't make any assumptions
+            return false;
+        }
+
+        var globalMetrics = evaluatedMetrics.getGlobalMetrics();
+        if (!(globalMetrics.containsKey(ScalingMetric.NUM_TASK_MANAGERS)
+                && 
globalMetrics.containsKey(ScalingMetric.NUM_TOTAL_TASK_SLOTS)
+                && 
globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED))) {
+            LOG.info("JM metrics not ready yet");
+            return true;
+        }
+
+        var vertexMetrics = evaluatedMetrics.getVertexMetrics();
+
+        int oldParallelismSum =
+                vertexMetrics.values().stream()
+                        .map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
+                        .reduce(0, Integer::sum);
+
+        Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+        for (Map.Entry<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>> entry :
+                vertexMetrics.entrySet()) {
+            JobVertexID jobVertexID = entry.getKey();
+            ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+            if (scalingSummary != null) {
+                newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
+            } else {
+                newParallelisms.put(
+                        jobVertexID,
+                        (int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+            }
+        }
+
+        double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+
+        final int numTaskSlotsAfterRescale;
+        if (oldParallelismSum == numTaskSlotsUsed) {
+            // Slot sharing activated
+            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::sum);
+        } else {
+            // Assuming slot sharing is not activated
+            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::max);

Review Comment:
   max and sum reversed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to