gyfora commented on code in PR #686: URL: https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1369989166
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java: ########## @@ -332,30 +357,51 @@ protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames( && previousMetricNames .keySet() .equals(topology.getParallelisms().keySet())) { - // We have already gathered the metric names for this topology - return previousMetricNames; + var newMetricNames = new HashMap<>(previousMetricNames); + var sourceMetricNames = + queryFilteredMetricNames( + ctx, + topology, + vertices.stream() + .filter(topology::isSource) + .filter( + v -> + !topology.getFinishedVertices() + .contains(v))); + newMetricNames.putAll(sourceMetricNames); + return newMetricNames; } - try (var restClient = ctx.getRestClusterClient()) { - return vertices.stream() - .filter(v -> !topology.getFinishedVertices().contains(v)) - .collect( - Collectors.toMap( - v -> v, - v -> - getFilteredVertexMetricNames( - restClient, - ctx.getJobID(), - v, - topology))); - } catch (Exception e) { - throw new RuntimeException(e); - } + // Query all metric names + return queryFilteredMetricNames( + ctx, + topology, + vertices.stream() + .filter( + v -> + !topology.getFinishedVertices() + .contains(v))); }); names.keySet().removeAll(topology.getFinishedVertices()); return names; } + private Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames( + Context ctx, JobTopology topology, Stream<JobVertexID> vertexStream) { + try (var restClient = ctx.getRestClusterClient()) { + return vertexStream + .filter(v -> !topology.getFinishedVertices().contains(v)) Review Comment: good catch, will remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org