This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ad8b4abf [FLINK-31326] Consolidate source scaling logic (#546)
ad8b4abf is described below

commit ad8b4abf7334c7f196bd5a43f43e6181c9466574
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Mar 9 14:25:58 2023 +0100

    [FLINK-31326] Consolidate source scaling logic (#546)
    
    Previous changes removed the requirement for sources to have the 
pendingRecords metric. This allows to refactor and consolidate the source 
scaling code. We remove the option to disable scaling sources because the 
primary motivation was to skip scaling legacy sources which we can achieve 
without an extra configuration option.
    
    The refactored source scaling logic works like this:
    
    * Modern sources (busy time available)
    
    Scale source based on the busy time up. Add extra capacity if the lag
    information, provided by the pending records metric, is available. Otherwise
    assume 0 lag. Cap by the maximum number of partitions, if available.
    
    * Legacy sources (busy time unavailable)
    
    Leave the source parallelism unchanged, use the current processing rate as 
the
    target rate downstream.
---
 .../generated/auto_scaler_configuration.html       |   6 -
 examples/autoscaling/autoscaling.yaml              |   1 -
 .../autoscaler/ScalingMetricCollector.java         |  57 +++----
 .../autoscaler/ScalingMetricEvaluator.java         |  10 +-
 .../autoscaler/config/AutoScalerOptions.java       |  11 +-
 .../autoscaler/metrics/MetricAggregator.java       |   9 +-
 .../autoscaler/metrics/ScalingMetrics.java         | 171 ++++++++++-----------
 .../MetricsCollectionAndEvaluationTest.java        |   2 +-
 .../autoscaler/metrics/ScalingMetricsTest.java     |  44 +++---
 9 files changed, 147 insertions(+), 164 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index f21a16c8..11461049 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -86,12 +86,6 @@
             <td>Boolean</td>
             <td>Enable vertex scaling execution by the autoscaler. If 
disabled, the autoscaler will only collect metrics and evaluate the suggested 
parallelism for each vertex but will not upgrade the jobs.</td>
         </tr>
-        <tr>
-            
<td><h5>kubernetes.operator.job.autoscaler.scaling.sources.enabled</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Whether to enable scaling source vertices. Source vertices set 
the baseline ingestion rate for the processing based on the backlog size. If 
disabled, only regular job vertices will be scaled and source vertices will be 
unchanged.</td>
-        </tr>
         <tr>
             
<td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git a/examples/autoscaling/autoscaling.yaml 
b/examples/autoscaling/autoscaling.yaml
index 509db5d2..4b332dd1 100644
--- a/examples/autoscaling/autoscaling.yaml
+++ b/examples/autoscaling/autoscaling.yaml
@@ -25,7 +25,6 @@ spec:
   flinkVersion: v1_17
   flinkConfiguration:
     kubernetes.operator.job.autoscaler.enabled: "true"
-    kubernetes.operator.job.autoscaler.scaling.sources.enabled: "false"
     kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
     kubernetes.operator.job.autoscaler.metrics.window: "3m"
     pipeline.max-parallelism: "24"
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index c9265387..38868c6b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -42,7 +42,6 @@ import org.apache.flink.util.Preconditions;
 
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import lombok.SneakyThrows;
-import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,8 +60,6 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
-
 /** Metric collector using flink rest api. */
 public abstract class ScalingMetricCollector {
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetricCollector.class);
@@ -249,7 +246,7 @@ public abstract class ScalingMetricCollector {
                     }
                     ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics, 
vertexScalingMetrics);
 
-                    Optional<Double> lagGrowthRate =
+                    double lagGrowthRate =
                             computeLagGrowthRate(
                                     resourceID,
                                     jobVertexID,
@@ -274,30 +271,29 @@ public abstract class ScalingMetricCollector {
         return out;
     }
 
-    @NotNull
-    private Optional<Double> computeLagGrowthRate(
+    private double computeLagGrowthRate(
             ResourceID resourceID, JobVertexID jobVertexID, Double currentLag) 
{
         var metricHistory = histories.get(resourceID);
 
         if (metricHistory == null || metricHistory.isEmpty()) {
-            return Optional.empty();
+            return Double.NaN;
         }
 
         var lastCollectionTime = metricHistory.lastKey();
         var lastCollectedMetrics = 
metricHistory.get(lastCollectionTime).get(jobVertexID);
 
         if (lastCollectedMetrics == null) {
-            return Optional.empty();
+            return Double.NaN;
         }
 
         var lastLag = lastCollectedMetrics.get(ScalingMetric.LAG);
 
         if (lastLag == null || currentLag == null) {
-            return Optional.empty();
+            return Double.NaN;
         }
 
         var timeDiff = Duration.between(lastCollectionTime, 
clock.instant()).toSeconds();
-        return Optional.of((currentLag - lastLag) / timeDiff);
+        return (currentLag - lastLag) / timeDiff;
     }
 
     /** Query the available metric names for each job vertex for the current 
spec generation. */
@@ -373,20 +369,25 @@ public abstract class ScalingMetricCollector {
 
         if (topology.isSource(jobVertexID)) {
             
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
-            if (conf.getBoolean(SOURCE_SCALING_ENABLED)) {
-                requiredMetrics.add(FlinkMetric.PENDING_RECORDS);
-            } else {
-                FlinkMetric.PENDING_RECORDS
-                        .findAny(allMetricNames)
-                        .ifPresent(m -> filteredMetrics.put(m, 
FlinkMetric.PENDING_RECORDS));
-                FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
-                        .findAny(allMetricNames)
-                        .ifPresent(
-                                m ->
-                                        filteredMetrics.put(
-                                                m,
-                                                
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
-            }
+            // Pending records metric won't be available for some sources.
+            // The Kafka source, for instance, lazily initializes this metric 
on receiving
+            // the first record. If this is a fresh topic or no new data has 
been read since
+            // the last checkpoint, the pendingRecords metrics won't be 
available. Also, legacy
+            // sources do not have this metric.
+            Optional<String> pendingRecordsMetric =
+                    FlinkMetric.PENDING_RECORDS.findAny(allMetricNames);
+            pendingRecordsMetric.ifPresentOrElse(
+                    m -> filteredMetrics.put(m, FlinkMetric.PENDING_RECORDS),
+                    () ->
+                            LOG.warn(
+                                    "pendingRecords metric for {} could not be 
found. Either a legacy source or an idle source. Assuming no pending records.",
+                                    jobVertexID));
+            FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
+                    .findAny(allMetricNames)
+                    .ifPresent(
+                            m ->
+                                    filteredMetrics.put(
+                                            m, 
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
         } else {
             // Not a source so we must have numRecordsInPerSecond
             requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
@@ -402,14 +403,6 @@ public abstract class ScalingMetricCollector {
             if (flinkMetricName.isPresent()) {
                 // Add actual Flink metric name to list
                 filteredMetrics.put(flinkMetricName.get(), flinkMetric);
-            } else if (flinkMetric == FlinkMetric.PENDING_RECORDS) {
-                // Pending records metric won't be available for some sources.
-                // The Kafka source, for instance, lazily initializes this 
metric on receiving
-                // the first record. If this is a fresh topic or no new data 
has been read since
-                // the last checkpoint, the pendingRecords metrics won't be 
available.
-                LOG.warn(
-                        "pendingRecords metric for {} could not be found. This 
usually means the source hasn't read data. Assuming 0 pending records.",
-                        jobVertexID);
             } else {
                 throw new RuntimeException(
                         "Could not find required metric "
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 7110fd5e..1116c0a5 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -205,11 +205,7 @@ public class ScalingMetricEvaluator {
         if (topology.isSource(vertex)) {
             double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
 
-            var sourceRateMetric =
-                    latestVertexMetrics.containsKey(TARGET_DATA_RATE)
-                            ? TARGET_DATA_RATE
-                            : SOURCE_DATA_RATE;
-            if (!latestVertexMetrics.containsKey(sourceRateMetric)) {
+            if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
                 throw new RuntimeException(
                         "Cannot evaluate metrics without source target rate 
information");
             }
@@ -217,8 +213,8 @@ public class ScalingMetricEvaluator {
             out.put(
                     TARGET_DATA_RATE,
                     new EvaluatedScalingMetric(
-                            latestVertexMetrics.get(sourceRateMetric),
-                            getAverage(sourceRateMetric, vertex, 
metricsHistory)));
+                            latestVertexMetrics.get(SOURCE_DATA_RATE),
+                            getAverage(SOURCE_DATA_RATE, vertex, 
metricsHistory)));
 
             double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
             double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag / 
catchUpTargetSec;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 64643433..4d496f02 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -59,15 +59,6 @@ public class AutoScalerOptions {
                     .withDescription(
                             "Stabilization period in which no new scaling will 
be executed");
 
-    public static final ConfigOption<Boolean> SOURCE_SCALING_ENABLED =
-            autoScalerConfig("scaling.sources.enabled")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            "Whether to enable scaling source vertices. "
-                                    + "Source vertices set the baseline 
ingestion rate for the processing based on the backlog size. "
-                                    + "If disabled, only regular job vertices 
will be scaled and source vertices will be unchanged.");
-
     public static final ConfigOption<Double> TARGET_UTILIZATION =
             autoScalerConfig("target.utilization")
                     .doubleType()
@@ -166,7 +157,7 @@ public class AutoScalerOptions {
             autoScalerConfig("vertex.exclude.ids")
                     .stringType()
                     .asList()
-                    .defaultValues(new String[0])
+                    .defaultValues()
                     .withDescription(
                             "A (semicolon-separated) list of vertex ids in 
hexstring for which to disable scaling. Caution: For non-sink vertices this 
will still scale their downstream operators until 
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
index 07765fe8..c79d4b7d 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
@@ -19,7 +19,6 @@ package 
org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
-import java.util.Optional;
 import java.util.function.Function;
 
 /** Enum specifying which aggregator to use when getting a metric value. */
@@ -34,7 +33,11 @@ public enum MetricAggregator {
         this.getter = getter;
     }
 
-    public Optional<Double> get(AggregatedMetric metric) {
-        return Optional.ofNullable(metric).map(getter).filter(d -> !d.isNaN());
+    public double get(AggregatedMetric metric) {
+        if (metric != null) {
+            return getter.apply(metric);
+        } else {
+            return Double.NaN;
+        }
     }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 05a947d9..bf508c71 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -27,10 +27,10 @@ import org.apache.commons.math3.util.Precision;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-
-import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
+import java.util.Set;
 
 /** Utilities for computing scaling metrics based on Flink metrics. */
 public class ScalingMetrics {
@@ -66,85 +66,47 @@ public class ScalingMetrics {
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             Map<ScalingMetric, Double> scalingMetrics,
             JobTopology topology,
-            Optional<Double> lagGrowthOpt,
+            double lagGrowthRate,
             Configuration conf) {
 
-        var source = topology.getInputs().get(jobVertexID).isEmpty();
-        var sink = topology.getOutputs().get(jobVertexID).isEmpty();
-
-        var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
-        var busyTimeOpt = 
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
-
-        if (busyTimeOpt.isEmpty()) {
-            LOG.error("Cannot compute true processing/output rate without 
busyTimeMsPerSecond");
-            return;
-        }
+        var isSource = topology.getInputs().get(jobVertexID).isEmpty();
+        var isSink = topology.getOutputs().get(jobVertexID).isEmpty();
 
-        Double numRecordsInPerSecond = getNumRecordsInPerSecond(flinkMetrics, 
jobVertexID);
-        Double outputPerSecond = getNumRecordsOutPerSecond(flinkMetrics, 
jobVertexID, sink);
+        double busyTime = getBusyTime(flinkMetrics, conf, jobVertexID);
+        double busyTimeMultiplier = 1000 / keepAboveZero(busyTime);
 
-        double busyTimeMultiplier = 1000 / keepAboveZero(busyTimeOpt.get());
+        double numRecordsInPerSecond =
+                getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
+        double outputPerSecond =
+                getNumRecordsOutPerSecond(flinkMetrics, jobVertexID, isSource, 
isSink);
 
-        if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
-            double sourceInputRate = numRecordsInPerSecond;
+        if (isSource) {
+            double sourceDataRate = Math.max(0, numRecordsInPerSecond + 
lagGrowthRate);
+            LOG.info("Using computed source data rate {} for {}", 
sourceDataRate, jobVertexID);
+            scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE, sourceDataRate);
+            scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
numRecordsInPerSecond);
+        }
 
-            double targetDataRate;
-            if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) {
-                targetDataRate = sourceInputRate;
-            } else {
-                // If source in metric is not available (maybe legacy source) 
we use source
-                // output that should always be available
-                targetDataRate =
-                        
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
+        if (!Double.isNaN(numRecordsInPerSecond)) {
+            double trueProcessingRate = busyTimeMultiplier * 
numRecordsInPerSecond;
+            if (trueProcessingRate <= 0 || 
!Double.isFinite(trueProcessingRate)) {
+                trueProcessingRate = Double.NaN;
             }
-            scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
targetDataRate);
-            scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
-            scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputPerSecond / 
targetDataRate);
-            var trueOutputRate = busyTimeMultiplier * outputPerSecond;
-            scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate);
-            scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate);
-            LOG.info(
-                    "Scaling disabled for source {} using output rate {} as 
target",
-                    jobVertexID,
-                    trueOutputRate);
+            scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, 
trueProcessingRate);
+            scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
numRecordsInPerSecond);
         } else {
-            if (source) {
-                if (!lagGrowthOpt.isPresent() || 
numRecordsInPerSecond.isNaN()) {
-                    LOG.error(
-                            "Cannot compute source target data rate without 
numRecordsInPerSecond and pendingRecords (lag) metric for {}.",
-                            jobVertexID);
-                    scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, 
Double.NaN);
-                } else {
-                    double sourceDataRate = Math.max(0, numRecordsInPerSecond 
+ lagGrowthOpt.get());
-                    LOG.info(
-                            "Using computed source data rate {} for {}",
-                            sourceDataRate,
-                            jobVertexID);
-                    scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE, 
sourceDataRate);
-                }
-            }
+            LOG.error("Cannot compute true processing rate without 
numRecordsInPerSecond");
+        }
 
-            if (!numRecordsInPerSecond.isNaN()) {
-                double trueProcessingRate = busyTimeMultiplier * 
numRecordsInPerSecond;
-                if (trueProcessingRate <= 0 || 
!Double.isFinite(trueProcessingRate)) {
-                    trueProcessingRate = Double.NaN;
-                }
-                scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, 
trueProcessingRate);
-                scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
numRecordsInPerSecond);
+        if (!isSink) {
+            if (!Double.isNaN(outputPerSecond)) {
+                scalingMetrics.put(
+                        ScalingMetric.OUTPUT_RATIO, outputPerSecond / 
numRecordsInPerSecond);
+                scalingMetrics.put(
+                        ScalingMetric.TRUE_OUTPUT_RATE, busyTimeMultiplier * 
outputPerSecond);
             } else {
-                LOG.error("Cannot compute true processing rate without 
numRecordsInPerSecond");
-            }
-
-            if (!sink) {
-                if (!outputPerSecond.isNaN()) {
-                    scalingMetrics.put(
-                            ScalingMetric.OUTPUT_RATIO, outputPerSecond / 
numRecordsInPerSecond);
-                    scalingMetrics.put(
-                            ScalingMetric.TRUE_OUTPUT_RATE, busyTimeMultiplier 
* outputPerSecond);
-                } else {
-                    LOG.error(
-                            "Cannot compute processing and input rate without 
numRecordsOutPerSecond");
-                }
+                LOG.error(
+                        "Cannot compute processing and input rate without 
numRecordsOutPerSecond");
             }
         }
     }
@@ -160,13 +122,36 @@ public class ScalingMetrics {
         }
     }
 
-    private static Double getNumRecordsInPerSecond(
-            Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID 
jobVertexID) {
+    private static double getBusyTime(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Configuration conf,
+            JobVertexID jobVertexId) {
+        var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+        var busyTime = 
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
+        if (!Double.isFinite(busyTime)) {
+            LOG.error(
+                    "No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
+                    jobVertexId);
+            excludeVertexFromScaling(conf, jobVertexId);
+            // Pretend that the load is balanced because we don't know any 
better
+            return conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
+        }
+        return busyTime;
+    }
+
+    private static double getNumRecordsInPerSecond(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            JobVertexID jobVertexID,
+            boolean isSource) {
         var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
-        if (numRecordsInPerSecond == null) {
+        if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
             numRecordsInPerSecond =
                     
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
         }
+        if (isSource && (numRecordsInPerSecond == null || 
numRecordsInPerSecond.getSum() == 0)) {
+            numRecordsInPerSecond =
+                    
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
+        }
         if (numRecordsInPerSecond == null) {
             LOG.warn("Received null input rate for {}. Returning NaN.", 
jobVertexID);
             return Double.NaN;
@@ -174,26 +159,40 @@ public class ScalingMetrics {
         return keepAboveZero(numRecordsInPerSecond.getSum());
     }
 
-    private static Double getNumRecordsOutPerSecond(
+    private static double getNumRecordsOutPerSecond(
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             JobVertexID jobVertexID,
+            boolean isSource,
             boolean isSink) {
-        AggregatedMetric aggregatedMetric = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
-        if (aggregatedMetric == null) {
-            if (!isSink) {
-                LOG.warn("Received null output rate for {}. Returning NaN.", 
jobVertexID);
+        AggregatedMetric numRecordsOutPerSecond =
+                flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+        if (numRecordsOutPerSecond == null) {
+            if (isSource) {
+                numRecordsOutPerSecond =
+                        
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
+            }
+            if (numRecordsOutPerSecond == null) {
+                if (!isSink) {
+                    LOG.warn("Received null output rate for {}. Returning 
NaN.", jobVertexID);
+                }
+                return Double.NaN;
             }
-            return Double.NaN;
         }
-        return keepAboveZero(aggregatedMetric.getSum());
+        return keepAboveZero(numRecordsOutPerSecond.getSum());
     }
 
-    private static Double keepAboveZero(Double number) {
-        if (number <= 0) {
-            // Make busy time really tiny but not zero
+    private static double keepAboveZero(double value) {
+        if (value <= 0) {
+            // Make value tiny but not zero
             return EFFECTIVELY_ZERO;
         }
-        return number;
+        return value;
+    }
+
+    private static void excludeVertexFromScaling(Configuration conf, 
JobVertexID jobVertexId) {
+        Set<String> excludedIds = new 
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
+        excludedIds.add(jobVertexId.toHexString());
+        conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
     }
 
     public static double roundMetric(double value) {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 6cc0d839..9f93d5d5 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -409,7 +409,7 @@ public class MetricsCollectionAndEvaluationTest {
                                 new AggregatedMetric("", Double.NaN, 0.1, 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 0.),
-                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 0.))));
 
         var collectedMetrics = collectMetrics();
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 55567cbe..62bc5a51 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -29,10 +29,10 @@ import org.junit.jupiter.api.Test;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for scaling metrics computation logic. */
 public class ScalingMetricsTest {
@@ -61,7 +61,7 @@ public class ScalingMetricsTest {
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
                 scalingMetrics,
                 topology,
-                Optional.of(15.),
+                15.,
                 new Configuration());
 
         assertEquals(
@@ -91,7 +91,7 @@ public class ScalingMetricsTest {
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
                 scalingMetrics,
                 topology,
-                Optional.of(-50.),
+                -50.,
                 new Configuration());
 
         assertEquals(
@@ -120,7 +120,7 @@ public class ScalingMetricsTest {
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
                 scalingMetrics,
                 topology,
-                Optional.empty(),
+                0.,
                 new Configuration());
 
         assertEquals(
@@ -150,7 +150,7 @@ public class ScalingMetricsTest {
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
                 scalingMetrics,
                 topology,
-                Optional.empty(),
+                0.,
                 conf);
 
         assertEquals(
@@ -167,35 +167,43 @@ public class ScalingMetricsTest {
     }
 
     @Test
-    public void testSourceScalingDisabled() {
+    public void testLegacySourceScaling() {
         var source = new JobVertexID();
+        var sink = new JobVertexID();
 
-        var topology = new JobTopology(new VertexInfo(source, 
Collections.emptySet(), 1, 1));
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 5, 1),
+                        new VertexInfo(sink, Collections.singleton(source), 
10, 100));
 
         Configuration conf = new Configuration();
-        // Disable scaling sources
-        conf.setBoolean(AutoScalerOptions.SOURCE_SCALING_ENABLED, false);
 
         Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
         ScalingMetrics.computeDataRateMetrics(
                 source,
                 Map.of(
+                        // Busy time is NaN for legacy sources
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, 500., Double.NaN, 
Double.NaN),
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, Double.NaN),
                         FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 4000.)),
                 scalingMetrics,
                 topology,
-                Optional.empty(),
+                0.,
                 conf);
 
-        // Sources are not scaled, the rates are solely computed on the basis 
of the true output
-        // rate
-        assertEquals(Double.NaN, 
scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
-        assertEquals(8000, scalingMetrics.get(ScalingMetric.TARGET_DATA_RATE));
-        assertEquals(8000, scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
+        // Make sure vertex won't be scaled
+        
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(source.toHexString()));
+        // Legacy source rates are computed based on the current rate and a 
balanced utilization
+        assertEquals(
+                2000 / conf.get(AutoScalerOptions.TARGET_UTILIZATION),
+                scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
+        assertEquals(2000, scalingMetrics.get(ScalingMetric.SOURCE_DATA_RATE));
+        assertEquals(
+                scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE) * 2,
+                scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
         assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO));
     }
 
@@ -287,13 +295,13 @@ public class ScalingMetricsTest {
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
                         new AggregatedMetric("", Double.NaN, busyness, 
Double.NaN, Double.NaN),
-                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, rate),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, rate)),
                 scalingMetrics,
                 topology,
-                Optional.of(0.),
+                0.,
                 new Configuration());
 
         return scalingMetrics;

Reply via email to