This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 456fb6e9 [FLINK-33966][autoscaler] Improve the comments of
getNumRecordsInPerSecond (#743)
456fb6e9 is described below
commit 456fb6e9a816d7f4bebdceb5fcac4cd13cace1a4
Author: Yang-LI-CS <[email protected]>
AuthorDate: Thu Jan 4 10:39:23 2024 +0100
[FLINK-33966][autoscaler] Improve the comments of getNumRecordsInPerSecond
(#743)
---------
Co-authored-by: Rui Fan <[email protected]>
Co-authored-by: Maximilian Michels <[email protected]>
---
.../java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index c92cf464..c1548f87 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -216,15 +216,22 @@ public class ScalingMetrics {
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
JobVertexID jobVertexID,
boolean isSource) {
+ // Generate numRecordsInPerSecond from 3 metrics:
+ // 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task
metric.
var numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+ // 2. If the former is unavailable and the vertex contains a source
operator, use the
+ // corresponding source operator metric.
if (isSource && (numRecordsInPerSecond == null ||
numRecordsInPerSecond.getSum() == 0)) {
numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
}
+ // 3. If the vertex contains a source operator which does not emit
input metrics, use output
+ // metrics instead.
if (isSource && (numRecordsInPerSecond == null ||
numRecordsInPerSecond.getSum() == 0)) {
numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
}
+
if (numRecordsInPerSecond == null) {
LOG.warn("Received null input rate for {}. Returning NaN.",
jobVertexID);
return Double.NaN;