1996fanrui commented on code in PR #660:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/660#discussion_r1309762404
##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java:
##########
@@ -66,8 +67,10 @@ public void testMetricsRegistration() {
initRecommendedParallelism(evaluatedMetrics);
lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
- metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
- metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
+ metrics.registerScalingMetrics(
+ () -> List.of(jobVertexID), () ->
lastEvaluatedMetrics.get(resourceID));
+ metrics.registerScalingMetrics(
+ () -> List.of(jobVertexID), () ->
lastEvaluatedMetrics.get(resourceID));
Review Comment:
Could you help adding a test that some metrics are collected after
`registerScalingMetrics`?
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java:
##########
@@ -62,54 +61,49 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
}
public void registerScalingMetrics(
+ Supplier<List<JobVertexID>> jobVerticesSupplier,
Supplier<Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
- currentVertexMetrics) {
- currentVertexMetrics
- .get()
- .forEach(
- (jobVertexID, evaluated) -> {
- if (!vertexMetrics.add(jobVertexID)) {
- return;
- }
- LOG.info("Registering scaling metrics for job
vertex {}", jobVertexID);
- var jobVertexMg =
- metricGroup.addGroup(JOB_VERTEX_ID,
jobVertexID.toHexString());
-
- evaluated.forEach(
- (sm, esm) -> {
- var smGroup =
jobVertexMg.addGroup(sm.name());
-
- smGroup.gauge(
- CURRENT,
- () ->
- Optional.ofNullable(
-
currentVertexMetrics.get())
- .map(m ->
m.get(jobVertexID))
- .map(metrics
-> metrics.get(sm))
- .map(
-
EvaluatedScalingMetric
-
::getCurrent)
-
.orElse(Double.NaN));
-
- if (sm.isCalculateAverage()) {
- smGroup.gauge(
- AVERAGE,
- () ->
-
Optional.ofNullable(
-
currentVertexMetrics
-
.get())
- .map(m ->
m.get(jobVertexID))
-
.map(metrics -> metrics.get(sm))
- .map(
-
EvaluatedScalingMetric
-
::getAverage)
-
.orElse(Double.NaN));
- }
- });
- });
+ metricsSupplier) {
+ if (scalingMetricsInitialized) {
+ // It is important that we only initialize these metrics once
because the Flink API does
+ // not support registering counters / gauges multiple times.
+ // The metrics will be updated via the provided metricsSupplier.
+ return;
+ }
+ scalingMetricsInitialized = true;
+
+ LOG.info("Registering scaling metrics");
+ for (JobVertexID jobVertexID : jobVerticesSupplier.get()) {
Review Comment:
The `jobVerticesSupplier.get()` is just called once, and it's not the lazy
call. so the `jobVerticesSupplier` parameter can be simplified to
`jobVertices`, right?
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java:
##########
@@ -62,54 +61,49 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
}
public void registerScalingMetrics(
+ Supplier<List<JobVertexID>> jobVerticesSupplier,
Supplier<Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>
- currentVertexMetrics) {
- currentVertexMetrics
- .get()
- .forEach(
- (jobVertexID, evaluated) -> {
- if (!vertexMetrics.add(jobVertexID)) {
- return;
- }
- LOG.info("Registering scaling metrics for job
vertex {}", jobVertexID);
- var jobVertexMg =
- metricGroup.addGroup(JOB_VERTEX_ID,
jobVertexID.toHexString());
-
- evaluated.forEach(
- (sm, esm) -> {
- var smGroup =
jobVertexMg.addGroup(sm.name());
-
- smGroup.gauge(
- CURRENT,
- () ->
- Optional.ofNullable(
-
currentVertexMetrics.get())
- .map(m ->
m.get(jobVertexID))
- .map(metrics
-> metrics.get(sm))
- .map(
-
EvaluatedScalingMetric
-
::getCurrent)
-
.orElse(Double.NaN));
-
- if (sm.isCalculateAverage()) {
- smGroup.gauge(
- AVERAGE,
- () ->
-
Optional.ofNullable(
-
currentVertexMetrics
-
.get())
- .map(m ->
m.get(jobVertexID))
-
.map(metrics -> metrics.get(sm))
- .map(
-
EvaluatedScalingMetric
-
::getAverage)
-
.orElse(Double.NaN));
- }
- });
- });
+ metricsSupplier) {
+ if (scalingMetricsInitialized) {
+ // It is important that we only initialize these metrics once
because the Flink API does
+ // not support registering counters / gauges multiple times.
+ // The metrics will be updated via the provided metricsSupplier.
+ return;
+ }
+ scalingMetricsInitialized = true;
+
+ LOG.info("Registering scaling metrics");
Review Comment:
Is this log necessary?
If yes, could we add the resourceId(similar to job information)? Otherwise
we don't know which job is registering scaling metrics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]