This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 456fb6e9 [FLINK-33966][autoscaler] Improve the comments of 
getNumRecordsInPerSecond (#743)
456fb6e9 is described below

commit 456fb6e9a816d7f4bebdceb5fcac4cd13cace1a4
Author: Yang-LI-CS <[email protected]>
AuthorDate: Thu Jan 4 10:39:23 2024 +0100

    [FLINK-33966][autoscaler] Improve the comments of getNumRecordsInPerSecond 
(#743)
    
    ---------
    
    Co-authored-by: Rui Fan <[email protected]>
    Co-authored-by: Maximilian Michels <[email protected]>
---
 .../java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java   | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index c92cf464..c1548f87 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -216,15 +216,22 @@ public class ScalingMetrics {
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             JobVertexID jobVertexID,
             boolean isSource) {
+        // Generate numRecordsInPerSecond from 3 metrics:
+        // 1. If available, directly use the NUM_RECORDS_IN_PER_SEC task 
metric.
         var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+        // 2. If the former is unavailable and the vertex contains a source 
operator, use the
+        // corresponding source operator metric.
         if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
             numRecordsInPerSecond =
                     
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
         }
+        // 3. If the vertex contains a source operator which does not emit 
input metrics, use output
+        // metrics instead.
         if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
             numRecordsInPerSecond =
                     
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
         }
+
         if (numRecordsInPerSecond == null) {
             LOG.warn("Received null input rate for {}. Returning NaN.", 
jobVertexID);
             return Double.NaN;

Reply via email to