mxm commented on code in PR #628:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/628#discussion_r1259539220


##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -223,16 +220,26 @@ private void updateKafkaSourceMaxParallelisms(
      * Given a map of collected Flink vertex metrics we compute the scaling 
metrics for each job
      * vertex.
      *
-     * @param collectedMetrics Collected metrics for all job vertices.
+     * @param flinkMetrics Collected metrics for all job vertices.
      * @return Computed scaling metrics for all job vertices.
      */
     private CollectedMetrics convertToScalingMetrics(
             ResourceID resourceID,
-            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
collectedMetrics,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,

Review Comment:
   ```suggestion
               Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
collectedMetrics,
   ```
   
   Together with my comment above.



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -313,41 +320,37 @@ protected Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
         var jobId = 
JobID.fromHexString(cr.getStatus().getJobStatus().getJobId());
         var vertices = topology.getVerticesInTopologicalOrder();
 
-        long deployedGeneration = getDeployedGeneration(cr);
-
-        var previousMetricNames = 
availableVertexMetricNames.get(ResourceID.fromResource(cr));
-
-        if (previousMetricNames != null) {
-            if (deployedGeneration == previousMetricNames.f0) {
-                // We have already gathered the metric names for this spec, no 
need to query again
-                return previousMetricNames.f1;
-            } else {
-                availableVertexMetricNames.remove(ResourceID.fromResource(cr));
-            }
-        }
-
-        try (var restClient = (RestClusterClient<String>) 
flinkService.getClusterClient(conf)) {
-            var names =
-                    vertices.stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            v -> v,
-                                            v ->
-                                                    
getFilteredVertexMetricNames(
-                                                            restClient, jobId, 
v, topology)));
-            availableVertexMetricNames.put(
-                    ResourceID.fromResource(cr), Tuple2.of(deployedGeneration, 
names));
-            return names;
-        }
-    }
-
-    public static long getDeployedGeneration(AbstractFlinkResource<?, ?> cr) {
-        return cr.getStatus()
-                .getReconciliationStatus()
-                .deserializeLastReconciledSpecWithMeta()
-                .getMeta()
-                .getMetadata()
-                .getGeneration();
+        var resourceId = ResourceID.fromResource(cr);
+        var names =
+                availableVertexMetricNames.compute(
+                        resourceId,
+                        (k, previousMetricNames) -> {
+                            if (previousMetricNames != null
+                                    && previousMetricNames
+                                            .keySet()
+                                            
.equals(topology.getParallelisms().keySet())) {
+                                // We have already gathered the metric names 
for this topology
+                                return previousMetricNames;

Review Comment:
   Could it be that metrics are added late and we are missing some in the first 
attempt?



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java:
##########
@@ -67,4 +72,18 @@ public static double getTargetProcessingCapacity(
 
         return Math.round(lagCatchupTargetRate + restartCatchupRate + 
inputTargetAtUtilization);
     }
+
+    /** Temporarily exclude vertex from scaling for this run. This does not 
update the spec. */
+    public static boolean excludeVertexFromScaling(Configuration conf, 
JobVertexID jobVertexId) {
+        Set<String> excludedIds = new 
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
+        boolean added = excludedIds.add(jobVertexId.toHexString());
+        conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
+        return added;
+    }

Review Comment:
   ```suggestion
       public static boolean excludeVertexFromScaling(Configuration conf, 
JobVertexID jobVertexId) {
           excludeVerticesFromScaling(conf, 
Collections.singletonList(jobVertexId));
       }
   ```



##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java:
##########
@@ -223,16 +220,26 @@ private void updateKafkaSourceMaxParallelisms(
      * Given a map of collected Flink vertex metrics we compute the scaling 
metrics for each job
      * vertex.
      *
-     * @param collectedMetrics Collected metrics for all job vertices.
+     * @param flinkMetrics Collected metrics for all job vertices.
      * @return Computed scaling metrics for all job vertices.
      */
     private CollectedMetrics convertToScalingMetrics(
             ResourceID resourceID,
-            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
collectedMetrics,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
             JobTopology jobTopology,
             Configuration conf) {
 
         var out = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
+
+        var finishedVertices = jobTopology.getFinishedVertices();
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> collectedMetrics;
+        if (finishedVertices.isEmpty()) {
+            collectedMetrics = flinkMetrics;
+        } else {
+            collectedMetrics = new HashMap<>(flinkMetrics);
+            finishedVertices.forEach(v -> collectedMetrics.put(v, 
FlinkMetric.FINISHED_METRICS));
+        }

Review Comment:
   ```suggestion
           if (!finishedVertices.isEmpty()) {
               collectedMetrics = new HashMap<>(collectedMetrics);
               finishedVertices.forEach(v -> collectedMetrics.put(v, 
FlinkMetric.FINISHED_METRICS));
           }
   ```
   
   I would prefer this over adding an extra variable because it is easier to 
read for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to