mxm commented on code in PR #726:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422597784
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##########
@@ -78,19 +91,87 @@ protected Map<FlinkMetric, AggregatedMetric>
queryAggregatedVertexMetrics(
EmptyRequestBody.getInstance())
.get();
- return responseBody.getMetrics().stream()
- .collect(
- Collectors.toMap(
- m -> metrics.get(m.getId()),
- m -> m,
- (m1, m2) ->
- new AggregatedMetric(
- m1.getId() + " merged with
" + m2.getId(),
- Math.min(m1.getMin(),
m2.getMin()),
- Math.max(m1.getMax(),
m2.getMax()),
- // Average can't be
computed
- Double.NaN,
- m1.getSum() +
m2.getSum())));
+ return aggregateByFlinkMetric(metrics, responseBody);
}
}
+
+ protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx)
throws Exception {
+ try (var restClient = ctx.getRestClusterClient()) {
+ var metricNames = getTmMetricNames(restClient, ctx);
+ var metricNameMapping = new HashMap<String, FlinkMetric>();
+
+ REQUIRED_TM_METRICS.forEach(
+ fm -> {
+ var name =
+ fm.findAny(metricNames)
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Could not
find required TM metric "
+ +
fm.name()));
+ metricNameMapping.put(name, fm);
+ });
Review Comment:
Oh I missed that. Thanks for clarifying.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]