gyfora commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1369989166


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java:
##########
@@ -332,30 +357,51 @@ protected Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
                                     && previousMetricNames
                                             .keySet()
                                             
.equals(topology.getParallelisms().keySet())) {
-                                // We have already gathered the metric names 
for this topology
-                                return previousMetricNames;
+                                var newMetricNames = new 
HashMap<>(previousMetricNames);
+                                var sourceMetricNames =
+                                        queryFilteredMetricNames(
+                                                ctx,
+                                                topology,
+                                                vertices.stream()
+                                                        
.filter(topology::isSource)
+                                                        .filter(
+                                                                v ->
+                                                                        
!topology.getFinishedVertices()
+                                                                               
 .contains(v)));
+                                newMetricNames.putAll(sourceMetricNames);
+                                return newMetricNames;
                             }
 
-                            try (var restClient = ctx.getRestClusterClient()) {
-                                return vertices.stream()
-                                        .filter(v -> 
!topology.getFinishedVertices().contains(v))
-                                        .collect(
-                                                Collectors.toMap(
-                                                        v -> v,
-                                                        v ->
-                                                                
getFilteredVertexMetricNames(
-                                                                        
restClient,
-                                                                        
ctx.getJobID(),
-                                                                        v,
-                                                                        
topology)));
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
+                            // Query all metric names
+                            return queryFilteredMetricNames(
+                                    ctx,
+                                    topology,
+                                    vertices.stream()
+                                            .filter(
+                                                    v ->
+                                                            
!topology.getFinishedVertices()
+                                                                    
.contains(v)));
                         });
         names.keySet().removeAll(topology.getFinishedVertices());
         return names;
     }
 
+    private Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
+            Context ctx, JobTopology topology, Stream<JobVertexID> 
vertexStream) {
+        try (var restClient = ctx.getRestClusterClient()) {
+            return vertexStream
+                    .filter(v -> !topology.getFinishedVertices().contains(v))

Review Comment:
   good catch, will remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to