This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ff4bbd2a [FLINK-31827] Compute output ratio per edge
ff4bbd2a is described below

commit ff4bbd2a7bbed4ba0b1443d53731c883a230b6d4
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Apr 20 11:16:02 2023 +0200

    [FLINK-31827] Compute output ratio per edge
---
 .../operator/autoscaler/AutoScalerInfo.java        |  23 +--
 .../operator/autoscaler/JobVertexScaler.java       |   6 +-
 .../autoscaler/ScalingMetricCollector.java         |  35 +++--
 .../autoscaler/ScalingMetricEvaluator.java         |  58 ++++----
 ...tedMetrics.java => CollectedMetricHistory.java} |   6 +-
 .../autoscaler/metrics/CollectedMetrics.java       |  17 +--
 .../metrics/{CollectedMetrics.java => Edge.java}   |  21 ++-
 .../operator/autoscaler/metrics/ScalingMetric.java |   6 -
 .../autoscaler/metrics/ScalingMetrics.java         | 123 ++++++++++------
 ...SerDeModule.java => AutoScalerSerDeModule.java} |  25 +++-
 .../operator/autoscaler/AutoScalerInfoTest.java    |  19 ++-
 .../MetricsCollectionAndEvaluationTest.java        |   4 +-
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 156 ++++++++++-----------
 .../autoscaler/metrics/ScalingMetricsTest.java     | 147 ++++++++++++-------
 14 files changed, 387 insertions(+), 259 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index a5462219..68bfc731 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -21,12 +21,13 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import 
org.apache.flink.kubernetes.operator.autoscaler.utils.JobVertexSerDeModule;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import 
org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -71,7 +72,7 @@ public class AutoScalerInfo {
     protected static final ObjectMapper YAML_MAPPER =
             new ObjectMapper(yamlFactory())
                     .registerModule(new JavaTimeModule())
-                    .registerModule(new JobVertexSerDeModule());
+                    .registerModule(new AutoScalerSerDeModule());
 
     private final ConfigMap configMap;
     private Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory;
@@ -86,20 +87,24 @@ public class AutoScalerInfo {
         configMap.setData(Preconditions.checkNotNull(data));
     }
 
-    @SneakyThrows
-    public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
getMetricHistory() {
+    public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
         var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
         if (historyYaml == null) {
             return new TreeMap<>();
         }
 
-        return YAML_MAPPER.readValue(decompress(historyYaml), new 
TypeReference<>() {});
+        try {
+            return YAML_MAPPER.readValue(decompress(historyYaml), new 
TypeReference<>() {});
+        } catch (JsonProcessingException e) {
+            LOG.error(
+                    "Could not deserialize metric history, possibly the format 
changed. Discarding...");
+            return new TreeMap<>();
+        }
     }
 
     @SneakyThrows
     public void updateMetricHistory(
-            Instant jobUpdateTs,
-            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
history) {
+            Instant jobUpdateTs, SortedMap<Instant, CollectedMetrics> history) 
{
 
         configMap
                 .getData()
@@ -191,7 +196,7 @@ public class AutoScalerInfo {
         int scalingHistorySize = data.getOrDefault(SCALING_HISTORY_KEY, 
"").length();
         int metricHistorySize = data.getOrDefault(COLLECTED_METRICS_KEY, 
"").length();
 
-        SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricHistory = null;
+        SortedMap<Instant, CollectedMetrics> metricHistory = null;
         while (scalingHistorySize + metricHistorySize > MAX_CM_BYTES) {
             if (metricHistory == null) {
                 metricHistory = getMetricHistory();
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 31a0ed8d..f0d6a183 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -207,7 +207,11 @@ public class JobVertexScaler {
                 message);
 
         if 
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
-            LOG.info(message);
+            LOG.info(
+                    "Ineffective scaling detected for {}, expected increase 
{}, actual {}",
+                    vertex,
+                    expectedIncrease,
+                    actualIncrease);
             return true;
         } else {
             return false;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 38868c6b..c4493795 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
@@ -67,14 +68,14 @@ public abstract class ScalingMetricCollector {
     private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String, 
FlinkMetric>>>>
             availableVertexMetricNames = new ConcurrentHashMap<>();
 
-    private final Map<ResourceID, SortedMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>>
-            histories = new ConcurrentHashMap<>();
+    private final Map<ResourceID, SortedMap<Instant, CollectedMetrics>> 
histories =
+            new ConcurrentHashMap<>();
 
     private final Map<ResourceID, JobTopology> topologies = new 
ConcurrentHashMap<>();
 
     private Clock clock = Clock.systemDefaultZone();
 
-    public CollectedMetrics updateMetrics(
+    public CollectedMetricHistory updateMetrics(
             AbstractFlinkResource<?, ?> cr,
             AutoScalerInfo scalingInformation,
             FlinkService flinkService,
@@ -99,7 +100,7 @@ public abstract class ScalingMetricCollector {
         if (now.isBefore(stableTime)) {
             // As long as we are stabilizing, collect no metrics at all
             LOG.info("Skipping metric collection during stabilization period 
until {}", stableTime);
-            return new CollectedMetrics(topology, 
Collections.emptySortedMap());
+            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
         }
 
         // Adjust the window size until it reaches the max size
@@ -142,10 +143,10 @@ public abstract class ScalingMetricCollector {
             LOG.info(
                     "Waiting until {} so the initial metric window is full 
before starting scaling",
                     windowFullTime);
-            return new CollectedMetrics(topology, 
Collections.emptySortedMap());
+            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
         }
 
-        return new CollectedMetrics(topology, scalingMetricHistory);
+        return new CollectedMetricHistory(topology, scalingMetricHistory);
     }
 
     protected JobTopology getJobTopology(
@@ -225,7 +226,7 @@ public abstract class ScalingMetricCollector {
      * @param collectedMetrics Collected metrics for all job vertices.
      * @return Computed scaling metrics for all job vertices.
      */
-    private Map<JobVertexID, Map<ScalingMetric, Double>> 
convertToScalingMetrics(
+    private CollectedMetrics convertToScalingMetrics(
             ResourceID resourceID,
             Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> 
collectedMetrics,
             JobTopology jobTopology,
@@ -268,7 +269,9 @@ public abstract class ScalingMetricCollector {
                             "Vertex scaling metrics for {}: {}", jobVertexID, 
vertexScalingMetrics);
                 });
 
-        return out;
+        var outputRatios = 
ScalingMetrics.computeOutputRatios(collectedMetrics, jobTopology);
+        LOG.debug("Output ratios: {}", outputRatios);
+        return new CollectedMetrics(out, outputRatios);
     }
 
     private double computeLagGrowthRate(
@@ -280,7 +283,8 @@ public abstract class ScalingMetricCollector {
         }
 
         var lastCollectionTime = metricHistory.lastKey();
-        var lastCollectedMetrics = 
metricHistory.get(lastCollectionTime).get(jobVertexID);
+        var lastCollectedMetrics =
+                
metricHistory.get(lastCollectionTime).getVertexMetrics().get(jobVertexID);
 
         if (lastCollectedMetrics == null) {
             return Double.NaN;
@@ -328,7 +332,7 @@ public abstract class ScalingMetricCollector {
                                             v -> v,
                                             v ->
                                                     
getFilteredVertexMetricNames(
-                                                            restClient, jobId, 
v, topology, conf)));
+                                                            restClient, jobId, 
v, topology)));
             availableVertexMetricNames.put(
                     ResourceID.fromResource(cr), Tuple2.of(deployedGeneration, 
names));
             return names;
@@ -357,8 +361,7 @@ public abstract class ScalingMetricCollector {
             RestClusterClient<?> restClient,
             JobID jobID,
             JobVertexID jobVertexID,
-            JobTopology topology,
-            Configuration conf) {
+            JobTopology topology) {
 
         var allMetricNames = queryAggregatedMetricNames(restClient, jobID, 
jobVertexID);
 
@@ -405,10 +408,7 @@ public abstract class ScalingMetricCollector {
                 filteredMetrics.put(flinkMetricName.get(), flinkMetric);
             } else {
                 throw new RuntimeException(
-                        "Could not find required metric "
-                                + flinkMetric.name()
-                                + " for "
-                                + jobVertexID);
+                        "Could not find required metric " + flinkMetric + " 
for " + jobVertexID);
             }
         }
 
@@ -460,8 +460,7 @@ public abstract class ScalingMetricCollector {
     }
 
     @VisibleForTesting
-    protected Map<ResourceID, SortedMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>>
-            getHistories() {
+    protected Map<ResourceID, SortedMap<Instant, CollectedMetrics>> 
getHistories() {
         return histories;
     }
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index b167d6bb..5b4d3b3a 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
@@ -44,13 +46,11 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMet
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 
 /** Job scaling evaluator for autoscaler. */
@@ -59,7 +59,7 @@ public class ScalingMetricEvaluator {
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetricEvaluator.class);
 
     public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluate(
-            Configuration conf, CollectedMetrics collectedMetrics) {
+            Configuration conf, CollectedMetricHistory collectedMetrics) {
 
         var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>>();
         var metricsHistory = collectedMetrics.getMetricHistory();
@@ -85,9 +85,9 @@ public class ScalingMetricEvaluator {
     @VisibleForTesting
     protected static boolean isProcessingBacklog(
             JobTopology topology,
-            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory,
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
             Configuration conf) {
-        var lastMetrics = metricsHistory.get(metricsHistory.lastKey());
+        var lastMetrics = 
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics();
         return topology.getVerticesInTopologicalOrder().stream()
                 .filter(topology::isSource)
                 .anyMatch(
@@ -113,12 +113,13 @@ public class ScalingMetricEvaluator {
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
             Configuration conf,
             HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
scalingOutput,
-            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory,
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
             JobTopology topology,
             JobVertexID vertex,
             boolean processingBacklog) {
 
-        var latestVertexMetrics = 
metricsHistory.get(metricsHistory.lastKey()).get(vertex);
+        var latestVertexMetrics =
+                
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex);
 
         var evaluatedMetrics = new HashMap<ScalingMetric, 
EvaluatedScalingMetric>();
         computeTargetDataRate(
@@ -143,21 +144,6 @@ public class ScalingMetricEvaluator {
                 
EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
 
         computeProcessingRateThresholds(evaluatedMetrics, conf, 
processingBacklog);
-
-        var isSink = topology.getOutputs().get(vertex).isEmpty();
-        if (!isSink) {
-            evaluatedMetrics.put(
-                    TRUE_OUTPUT_RATE,
-                    new EvaluatedScalingMetric(
-                            latestVertexMetrics.get(TRUE_OUTPUT_RATE),
-                            getAverage(TRUE_OUTPUT_RATE, vertex, 
metricsHistory)));
-            evaluatedMetrics.put(
-                    OUTPUT_RATIO,
-                    new EvaluatedScalingMetric(
-                            latestVertexMetrics.get(OUTPUT_RATIO),
-                            getAverage(OUTPUT_RATIO, vertex, metricsHistory)));
-        }
-
         return evaluatedMetrics;
     }
 
@@ -198,7 +184,7 @@ public class ScalingMetricEvaluator {
             JobVertexID vertex,
             Configuration conf,
             HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
alreadyEvaluated,
-            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory,
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
             Map<ScalingMetric, Double> latestVertexMetrics,
             Map<ScalingMetric, EvaluatedScalingMetric> out) {
 
@@ -233,7 +219,8 @@ public class ScalingMetricEvaluator {
             for (var inputVertex : inputs) {
                 var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
                 var inputTargetRate = 
inputEvaluatedMetrics.get(TARGET_DATA_RATE);
-                var outputRateMultiplier = 
inputEvaluatedMetrics.get(OUTPUT_RATIO).getAverage();
+                var outputRateMultiplier =
+                        getAverageOutputRatio(new Edge(inputVertex, vertex), 
metricsHistory);
                 sumCurrentTargetRate += inputTargetRate.getCurrent() * 
outputRateMultiplier;
                 sumAvgTargetRate += inputTargetRate.getAverage() * 
outputRateMultiplier;
                 sumCatchUpDataRate +=
@@ -250,10 +237,10 @@ public class ScalingMetricEvaluator {
     private static double getAverage(
             ScalingMetric metric,
             JobVertexID jobVertexId,
-            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory) {
+            SortedMap<Instant, CollectedMetrics> metricsHistory) {
         double[] metricValues =
                 metricsHistory.values().stream()
-                        .map(m -> m.get(jobVertexId))
+                        .map(m -> m.getVertexMetrics().get(jobVertexId))
                         .filter(m -> m.containsKey(metric))
                         .mapToDouble(m -> m.get(metric))
                         .filter(d -> !Double.isNaN(d))
@@ -267,4 +254,23 @@ public class ScalingMetricEvaluator {
         }
         return StatUtils.mean(metricValues);
     }
+
+    private static double getAverageOutputRatio(
+            Edge edge, SortedMap<Instant, CollectedMetrics> metricsHistory) {
+        double[] metricValues =
+                metricsHistory.values().stream()
+                        .map(m -> m.getOutputRatios())
+                        .filter(m -> m.containsKey(edge))
+                        .mapToDouble(m -> m.get(edge))
+                        .filter(d -> !Double.isNaN(d))
+                        .toArray();
+        for (double metricValue : metricValues) {
+            if (Double.isInfinite(metricValue)) {
+                // As long as infinite values are present, we can't properly 
average. We need to
+                // wait until they are evicted.
+                return metricValue;
+            }
+        }
+        return StatUtils.mean(metricValues);
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
similarity index 85%
copy from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
copy to 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
index 60f1c410..33243ec0 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
@@ -18,17 +18,15 @@
 package org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import lombok.Value;
 
 import java.time.Instant;
-import java.util.Map;
 import java.util.SortedMap;
 
 /** Topology and collected metric history. */
 @Value
-public class CollectedMetrics {
+public class CollectedMetricHistory {
     JobTopology jobTopology;
-    SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricHistory;
+    SortedMap<Instant, CollectedMetrics> metricHistory;
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
index 60f1c410..97e3f9fe 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
@@ -17,18 +17,19 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import java.time.Instant;
 import java.util.Map;
-import java.util.SortedMap;
 
-/** Topology and collected metric history. */
-@Value
+/** Collected scaling metrics. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
 public class CollectedMetrics {
-    JobTopology jobTopology;
-    SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricHistory;
+    private Map<JobVertexID, Map<ScalingMetric, Double>> vertexMetrics;
+    private Map<Edge, Double> outputRatios;
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
similarity index 71%
copy from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
copy to 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
index 60f1c410..c89692a9 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
@@ -17,18 +17,17 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-import java.time.Instant;
-import java.util.Map;
-import java.util.SortedMap;
-
-/** Topology and collected metric history. */
-@Value
-public class CollectedMetrics {
-    JobTopology jobTopology;
-    SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricHistory;
+/** Collected scaling metrics. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class Edge {
+    private JobVertexID from;
+    private JobVertexID to;
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index 1b543ed4..e41816ce 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -32,9 +32,6 @@ public enum ScalingMetric {
     /** Processing rate at full capacity (records/sec). */
     TRUE_PROCESSING_RATE(true),
 
-    /** Output rate at full capacity (records/sec). */
-    TRUE_OUTPUT_RATE(true),
-
     /** Current processing rate. */
     CURRENT_PROCESSING_RATE(true),
 
@@ -50,9 +47,6 @@ public enum ScalingMetric {
     /** Target processing rate of operators as derived from backlog 
(records/sec). */
     CATCH_UP_DATA_RATE(false),
 
-    /** Number of outputs produced on average for every input record. */
-    OUTPUT_RATIO(true),
-
     /** Total number of pending records. */
     LAG(false),
     /** Job vertex parallelism. */
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 54fe7bb6..0e4638c6 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -63,8 +64,7 @@ public class ScalingMetrics {
             double lagGrowthRate,
             Configuration conf) {
 
-        var isSource = topology.getInputs().get(jobVertexID).isEmpty();
-        var isSink = topology.getOutputs().get(jobVertexID).isEmpty();
+        var isSource = topology.isSource(jobVertexID);
 
         double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
         double numRecordsInPerSecond =
@@ -84,23 +84,39 @@ public class ScalingMetrics {
         } else {
             LOG.error("Cannot compute true processing rate without 
numRecordsInPerSecond");
         }
+    }
 
-        if (!isSink) {
-            double numRecordsOutPerSecond =
-                    getNumRecordsOutPerSecond(
-                            flinkMetrics, jobVertexID, isSource, 
numRecordsInPerSecond);
-            if (!Double.isNaN(numRecordsOutPerSecond)) {
-                double outputRatio =
-                        computeOutputRatio(numRecordsInPerSecond, 
numRecordsOutPerSecond);
-                scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputRatio);
-                scalingMetrics.put(
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        computeTrueRate(numRecordsOutPerSecond, 
busyTimeMsPerSecond));
-            } else {
-                LOG.error(
-                        "Cannot compute processing and input rate without 
numRecordsOutPerSecond");
+    public static Map<Edge, Double> computeOutputRatios(
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
+            JobTopology topology) {
+
+        var out = new HashMap<Edge, Double>();
+        for (JobVertexID from : flinkMetrics.keySet()) {
+            var outputs = topology.getOutputs().get(from);
+            if (outputs.isEmpty()) {
+                continue;
+            }
+
+            double numRecordsInPerSecond =
+                    getNumRecordsInPerSecond(flinkMetrics.get(from), from, 
topology.isSource(from));
+
+            for (JobVertexID to : outputs) {
+                double outputRatio = 0;
+                // If the input rate is zero, we also need to flatten the 
output rate.
+                // Otherwise, the OUTPUT_RATIO would be outrageously large, 
leading to
+                // a rapid scale up.
+                if (numRecordsInPerSecond > 0) {
+                    double edgeOutPerSecond =
+                            computeEdgeOutPerSecond(topology, flinkMetrics, 
from, to);
+                    if (edgeOutPerSecond > 0) {
+                        outputRatio = edgeOutPerSecond / numRecordsInPerSecond;
+                    }
+                }
+                out.put(new Edge(from, to), outputRatio);
             }
         }
+
+        return out;
     }
 
     public static void computeLagMetrics(
@@ -153,37 +169,66 @@ public class ScalingMetrics {
     }
 
     private static double getNumRecordsOutPerSecond(
-            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
-            JobVertexID jobVertexID,
-            boolean isSource,
-            double numRecordsInPerSecond) {
-        if (numRecordsInPerSecond <= 0) {
-            // If the input rate is zero, we also need to flatten the output 
rate.
-            // Otherwise, the OUTPUT_RATIO would be outrageously large, 
leading to
-            // a rapid scale up.
-            return 0;
-        }
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID 
jobVertexID) {
+
         AggregatedMetric numRecordsOutPerSecond =
                 flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+
         if (numRecordsOutPerSecond == null) {
-            if (isSource) {
-                numRecordsOutPerSecond =
-                        
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
-            }
-            if (numRecordsOutPerSecond == null) {
-                LOG.warn("Received null output rate for {}. Returning NaN.", 
jobVertexID);
-                return Double.NaN;
-            }
+            LOG.warn("Received null output rate for {}. Returning NaN.", 
jobVertexID);
+            return Double.NaN;
         }
         return numRecordsOutPerSecond.getSum();
     }
 
-    private static double computeOutputRatio(
-            double numRecordsInPerSecond, double numRecordsOutPerSecond) {
-        if (numRecordsInPerSecond <= 0) {
-            return 0;
+    private static double computeEdgeOutPerSecond(
+            JobTopology topology,
+            Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
+            JobVertexID from,
+            JobVertexID to) {
+        var toMetrics = flinkMetrics.get(to);
+
+        var toVertexInputs = topology.getInputs().get(to);
+        // Case 1: Downstream vertex has a single input (from) so we can use 
the most reliable num
+        // records in
+        if (toVertexInputs.size() == 1) {
+            LOG.debug(
+                    "Computing edge ({}, {}) data rate for single input 
downstream task", from, to);
+            return getNumRecordsInPerSecond(toMetrics, to, false);
+        }
+
+        // Case 2: Downstream vertex has only inputs from upstream vertices 
which don't have other
+        // outputs
+        double numRecordsOutFromUpstreamInputs = 0;
+        for (JobVertexID input : toVertexInputs) {
+            if (input.equals(from)) {
+                // Exclude source edge because we only want to consider other 
input edges
+                continue;
+            }
+            if (topology.getOutputs().get(input).size() == 1) {
+                numRecordsOutFromUpstreamInputs +=
+                        getNumRecordsOutPerSecond(flinkMetrics.get(input), 
input);
+            } else {
+                // Output vertex has multiple outputs, cannot use this 
information...
+                numRecordsOutFromUpstreamInputs = Double.NaN;
+                break;
+            }
+        }
+        if (!Double.isNaN(numRecordsOutFromUpstreamInputs)) {
+            LOG.debug(
+                    "Computing edge ({}, {}) data rate by subtracting upstream 
input rates",
+                    from,
+                    to);
+            return getNumRecordsInPerSecond(toMetrics, to, false) - 
numRecordsOutFromUpstreamInputs;
         }
-        return numRecordsOutPerSecond / numRecordsInPerSecond;
+        var fromMetrics = flinkMetrics.get(from);
+
+        // Case 3: We fall back simply to num records out, this is the least 
reliable
+        LOG.debug(
+                "Computing edge ({}, {}) data rate by falling back to from num 
records out",
+                from,
+                to);
+        return getNumRecordsOutPerSecond(fromMetrics, from);
     }
 
     private static double computeTrueRate(double rate, double 
busyTimeMsPerSecond) {
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
similarity index 68%
rename from 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
rename to 
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
index 5a5195a0..fad516a5 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler.utils;
 
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -29,11 +30,14 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import java.io.IOException;
 
 /** Jackson serializer module for {@link JobVertexID}. */
-public class JobVertexSerDeModule extends SimpleModule {
+public class AutoScalerSerDeModule extends SimpleModule {
 
-    public JobVertexSerDeModule() {
+    public AutoScalerSerDeModule() {
         this.addKeySerializer(JobVertexID.class, new 
JobVertexIdKeySerializer());
         this.addKeyDeserializer(JobVertexID.class, new 
JobVertexIdKeyDeserializer());
+
+        this.addKeySerializer(Edge.class, new EdgeKeySerializer());
+        this.addKeyDeserializer(Edge.class, new EdgeKeyDeserializer());
     }
 
     private static class JobVertexIdKeySerializer extends 
JsonSerializer<JobVertexID> {
@@ -51,4 +55,21 @@ public class JobVertexSerDeModule extends SimpleModule {
             return JobVertexID.fromHexString(s);
         }
     }
+
+    private static class EdgeKeySerializer extends JsonSerializer<Edge> {
+        @Override
+        public void serialize(Edge value, JsonGenerator jgen, 
SerializerProvider provider)
+                throws IOException {
+
+            jgen.writeFieldName(value.getFrom().toHexString() + "," + 
value.getTo().toHexString());
+        }
+    }
+
+    private static class EdgeKeyDeserializer extends KeyDeserializer {
+        @Override
+        public Object deserializeKey(String s, DeserializationContext 
deserializationContext) {
+            var arr = s.split(",");
+            return new Edge(JobVertexID.fromHexString(arr[0]), 
JobVertexID.fromHexString(arr[1]));
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index cb262020..8f894643 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -28,6 +29,7 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -112,8 +114,11 @@ public class AutoScalerInfoTest {
         var jobUpdateTs = Instant.now();
         var v1 = new JobVertexID();
 
-        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
-        metricHistory.put(jobUpdateTs, Map.of(v1, 
Map.of(ScalingMetric.TRUE_PROCESSING_RATE, 1.)));
+        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
+        metricHistory.put(
+                jobUpdateTs,
+                new CollectedMetrics(
+                        Map.of(v1, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, 
1.)), Map.of()));
 
         var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant, 
ScalingSummary>>();
         scalingHistory.put(v1, new TreeMap<>());
@@ -153,7 +158,7 @@ public class AutoScalerInfoTest {
         var v1 = new JobVertexID();
         Random rnd = new Random();
 
-        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
         for (int i = 0; i < 50; i++) {
             var m = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
             for (int j = 0; j < 500; j++) {
@@ -161,7 +166,7 @@ public class AutoScalerInfoTest {
                         new JobVertexID(),
                         Map.of(ScalingMetric.TRUE_PROCESSING_RATE, 
rnd.nextDouble()));
             }
-            metricHistory.put(Instant.now(), m);
+            metricHistory.put(Instant.now(), new CollectedMetrics(m, 
Collections.emptyMap()));
         }
 
         var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant, 
ScalingSummary>>();
@@ -197,4 +202,10 @@ public class AutoScalerInfoTest {
                                 + 
data.get(AutoScalerInfo.SCALING_HISTORY_KEY).length()
                         < AutoScalerInfo.MAX_CM_BYTES);
     }
+
+    @Test
+    public void testDiscardInvalidHistory() {
+        var info = new 
AutoScalerInfo(Map.of(AutoScalerInfo.COLLECTED_METRICS_KEY, "invalid"));
+        assertEquals(new TreeMap<>(), info.getMetricHistory());
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index cd6fa660..eb7cf40f 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
@@ -431,7 +431,7 @@ public class MetricsCollectionAndEvaluationTest {
         assertEquals(1, scaledParallelism.get(source1));
     }
 
-    private CollectedMetrics collectMetrics() throws Exception {
+    private CollectedMetricHistory collectMetrics() throws Exception {
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
index 3a535763..e1bc3d65 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
@@ -44,12 +46,10 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -70,45 +70,37 @@ public class ScalingMetricEvaluatorTest {
 
         var evaluator = new ScalingMetricEvaluator();
 
-        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
 
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
+                new CollectedMetrics(
                         Map.of(
-                                SOURCE_DATA_RATE,
-                                100.,
-                                LAG,
-                                0.,
-                                OUTPUT_RATIO,
-                                2.,
-                                TRUE_OUTPUT_RATE,
-                                200.,
-                                TRUE_PROCESSING_RATE,
-                                200.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                                source,
+                                Map.of(SOURCE_DATA_RATE, 100., LAG, 0., 
TRUE_PROCESSING_RATE, 200.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Map.of(new Edge(source, sink), 2.)));
 
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
+                new CollectedMetrics(
                         Map.of(
-                                SOURCE_DATA_RATE, 200.,
-                                LAG, 1000.,
-                                OUTPUT_RATIO, 2.,
-                                TRUE_OUTPUT_RATE, 200.,
-                                TRUE_PROCESSING_RATE, 200.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                                source,
+                                Map.of(
+                                        SOURCE_DATA_RATE, 200.,
+                                        LAG, 1000.,
+                                        TRUE_PROCESSING_RATE, 200.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Map.of(new Edge(source, sink), 2.)));
 
         var conf = new Configuration();
 
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
         var evaluatedMetrics =
-                evaluator.evaluate(conf, new CollectedMetrics(topology, 
metricHistory));
+                evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
                 new EvaluatedScalingMetric(200, 150),
                 evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -123,7 +115,8 @@ public class ScalingMetricEvaluatorTest {
                 evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
 
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
-        evaluatedMetrics = evaluator.evaluate(conf, new 
CollectedMetrics(topology, metricHistory));
+        evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
                 new EvaluatedScalingMetric(200, 150),
                 evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -140,7 +133,8 @@ public class ScalingMetricEvaluatorTest {
         // Restart time should not affect evaluated metrics
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
 
-        evaluatedMetrics = evaluator.evaluate(conf, new 
CollectedMetrics(topology, metricHistory));
+        evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
                 new EvaluatedScalingMetric(200, 150),
                 evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -156,7 +150,8 @@ public class ScalingMetricEvaluatorTest {
 
         // Turn off lag based scaling
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
-        evaluatedMetrics = evaluator.evaluate(conf, new 
CollectedMetrics(topology, metricHistory));
+        evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
                 new EvaluatedScalingMetric(200, 150),
                 evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -172,24 +167,17 @@ public class ScalingMetricEvaluatorTest {
         metricHistory.clear();
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
+                new CollectedMetrics(
                         Map.of(
-                                SOURCE_DATA_RATE,
-                                100.,
-                                LAG,
-                                0.,
-                                OUTPUT_RATIO,
-                                2.,
-                                TRUE_OUTPUT_RATE,
-                                200.,
-                                TRUE_PROCESSING_RATE,
-                                200.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                                source,
+                                Map.of(SOURCE_DATA_RATE, 100., LAG, 0., 
TRUE_PROCESSING_RATE, 200.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Map.of(new Edge(source, sink), 2.)));
 
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
-        evaluatedMetrics = evaluator.evaluate(conf, new 
CollectedMetrics(topology, metricHistory));
+        evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
                 new EvaluatedScalingMetric(100, 100),
                 evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -231,65 +219,73 @@ public class ScalingMetricEvaluatorTest {
                         new VertexInfo(source, Collections.emptySet(), 1, 1),
                         new VertexInfo(sink, Set.of(source), 1, 1));
 
-        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
 
         // 0 lag
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
-                        Map.of(LAG, 0., CURRENT_PROCESSING_RATE, 100.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                new CollectedMetrics(
+                        Map.of(
+                                source,
+                                Map.of(LAG, 0., CURRENT_PROCESSING_RATE, 100.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Collections.emptyMap()));
         assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
 
         // Missing lag
         metricHistory.clear();
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
-                        Map.of(CURRENT_PROCESSING_RATE, 100.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                new CollectedMetrics(
+                        Map.of(
+                                source,
+                                Map.of(CURRENT_PROCESSING_RATE, 100.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Collections.emptyMap()));
 
         assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
 
         // Catch up time is more than a minute at avg proc rate (200)
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
+                new CollectedMetrics(
                         Map.of(
-                                LAG,
-                                250.
-                                        * conf.get(
-                                                        AutoScalerOptions
-                                                                
.BACKLOG_PROCESSING_LAG_THRESHOLD)
-                                                .toSeconds(),
-                                CURRENT_PROCESSING_RATE,
-                                300.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                                source,
+                                Map.of(
+                                        LAG,
+                                        250.
+                                                * conf.get(
+                                                                
AutoScalerOptions
+                                                                        
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+                                                        .toSeconds(),
+                                        CURRENT_PROCESSING_RATE,
+                                        300.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Collections.emptyMap()));
 
         assertTrue(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
 
         // Catch up time is less than a minute at avg proc rate (200)
         metricHistory.put(
                 Instant.now(),
-                Map.of(
-                        source,
+                new CollectedMetrics(
                         Map.of(
-                                LAG,
-                                180.
-                                        * conf.get(
-                                                        AutoScalerOptions
-                                                                
.BACKLOG_PROCESSING_LAG_THRESHOLD)
-                                                .toSeconds(),
-                                CURRENT_PROCESSING_RATE,
-                                200.),
-                        sink,
-                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+                                source,
+                                Map.of(
+                                        LAG,
+                                        180.
+                                                * conf.get(
+                                                                
AutoScalerOptions
+                                                                        
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+                                                        .toSeconds(),
+                                        CURRENT_PROCESSING_RATE,
+                                        200.),
+                                sink,
+                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                        Collections.emptyMap()));
         assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
     }
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index c9aa95b2..4783bd98 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -57,9 +57,9 @@ public class ScalingMetricsTest {
                         FlinkMetric.BUSY_TIME_PER_SEC,
                         new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        aggSum(1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                        aggSum(2000.)),
                 scalingMetrics,
                 topology,
                 15.,
@@ -69,10 +69,6 @@ public class ScalingMetricsTest {
                 Map.of(
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         10000.,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        20000.,
-                        ScalingMetric.OUTPUT_RATIO,
-                        2.,
                         ScalingMetric.SOURCE_DATA_RATE,
                         1015.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -87,9 +83,9 @@ public class ScalingMetricsTest {
                         FlinkMetric.BUSY_TIME_PER_SEC,
                         new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        aggSum(1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                        aggSum(2000.)),
                 scalingMetrics,
                 topology,
                 -50.,
@@ -99,10 +95,6 @@ public class ScalingMetricsTest {
                 Map.of(
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         10000.,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        20000.,
-                        ScalingMetric.OUTPUT_RATIO,
-                        2.,
                         ScalingMetric.SOURCE_DATA_RATE,
                         950.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -116,9 +108,9 @@ public class ScalingMetricsTest {
                         FlinkMetric.BUSY_TIME_PER_SEC,
                         new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        aggSum(1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                        aggSum(2000.)),
                 scalingMetrics,
                 topology,
                 0.,
@@ -128,10 +120,6 @@ public class ScalingMetricsTest {
                 Map.of(
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         10000.,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        20000.,
-                        ScalingMetric.OUTPUT_RATIO,
-                        2.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
                         1000.),
                 scalingMetrics);
@@ -146,9 +134,9 @@ public class ScalingMetricsTest {
                         FlinkMetric.BUSY_TIME_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        aggSum(1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                        aggSum(2000.)),
                 scalingMetrics,
                 topology,
                 0.,
@@ -158,10 +146,6 @@ public class ScalingMetricsTest {
                 Map.of(
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         10000.,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        20000.,
-                        ScalingMetric.OUTPUT_RATIO,
-                        2.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
                         1000.),
                 scalingMetrics);
@@ -187,11 +171,11 @@ public class ScalingMetricsTest {
                 Map.of(
                         // Busy time is NaN for legacy sources
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, Double.NaN),
+                        aggSum(Double.NaN),
                         FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.),
+                        aggSum(2000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 4000.)),
+                        aggSum(4000.)),
                 scalingMetrics,
                 topology,
                 0.,
@@ -206,10 +190,6 @@ public class ScalingMetricsTest {
                 2000 / conf.get(AutoScalerOptions.TARGET_UTILIZATION),
                 scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
         assertEquals(2000, scalingMetrics.get(ScalingMetric.SOURCE_DATA_RATE));
-        assertEquals(
-                scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE) * 2,
-                scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
-        assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO));
     }
 
     @Test
@@ -235,10 +215,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // When not busy at all, we have infinite processing 
power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        Double.POSITIVE_INFINITY,
-                        ScalingMetric.OUTPUT_RATIO,
-                        1.,
                         ScalingMetric.SOURCE_DATA_RATE,
                         dataRate,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -256,11 +232,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // When no records are coming in, we assume infinite 
processing power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        Double.POSITIVE_INFINITY,
-                        ScalingMetric.OUTPUT_RATIO,
-                        // We are not producing any records
-                        0.,
                         ScalingMetric.SOURCE_DATA_RATE,
                         0.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -274,13 +245,9 @@ public class ScalingMetricsTest {
         assertEquals(
                 Map.of(
                         // If there is zero input the out ratio must be zero
-                        ScalingMetric.OUTPUT_RATIO,
-                        0.,
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // When no records are coming in, we assume infinite 
processing power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        Double.POSITIVE_INFINITY,
                         ScalingMetric.SOURCE_DATA_RATE,
                         0.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -296,10 +263,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // Nothing is coming in, we must assume infinite 
processing power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.TRUE_OUTPUT_RATE,
-                        Double.POSITIVE_INFINITY,
-                        ScalingMetric.OUTPUT_RATIO,
-                        0.,
                         ScalingMetric.SOURCE_DATA_RATE,
                         0.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -329,7 +292,7 @@ public class ScalingMetricsTest {
                         new AggregatedMetric(
                                 "", Double.NaN, Double.NaN, Double.NaN, 
processingRate),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, outputRate)),
+                        aggSum(outputRate)),
                 scalingMetrics,
                 topology,
                 0.,
@@ -337,4 +300,90 @@ public class ScalingMetricsTest {
 
         return scalingMetrics;
     }
+
+    @Test
+    public void testComputableOutputRatios() {
+        var source1 = new JobVertexID();
+        var source2 = new JobVertexID();
+
+        var op1 = new JobVertexID();
+        var sink1 = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source1, Collections.emptySet(), 1, 1),
+                        new VertexInfo(source2, Collections.emptySet(), 1, 1),
+                        new VertexInfo(op1, Set.of(source1, source2), 1, 1),
+                        new VertexInfo(sink1, Set.of(op1), 1, 1));
+
+        var allMetrics = new HashMap<JobVertexID, Map<FlinkMetric, 
AggregatedMetric>>();
+        allMetrics.put(
+                source1,
+                Map.of(
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        aggSum(100),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        aggSum(200)));
+        allMetrics.put(
+                source2,
+                Map.of(
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        aggSum(100),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        aggSum(50)));
+
+        allMetrics.put(op1, Map.of(FlinkMetric.NUM_RECORDS_IN_PER_SEC, 
aggSum(250)));
+        allMetrics.put(sink1, Map.of(FlinkMetric.NUM_RECORDS_IN_PER_SEC, 
aggSum(50)));
+
+        assertEquals(
+                Map.of(
+                        new Edge(source1, op1), 2.,
+                        new Edge(source2, op1), 0.5,
+                        new Edge(op1, sink1), 0.2),
+                ScalingMetrics.computeOutputRatios(allMetrics, topology));
+    }
+
+    @Test
+    public void testOutputRatioFallbackToOutPerSecond() {
+        var source1 = new JobVertexID();
+        var source2 = new JobVertexID();
+
+        var op1 = new JobVertexID();
+        var op2 = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source1, Collections.emptySet(), 1, 1),
+                        new VertexInfo(source2, Collections.emptySet(), 1, 1),
+                        new VertexInfo(op1, Set.of(source1, source2), 1, 1),
+                        new VertexInfo(op2, Set.of(source1, source2), 1, 1));
+
+        var allMetrics = new HashMap<JobVertexID, Map<FlinkMetric, 
AggregatedMetric>>();
+        allMetrics.put(
+                source1,
+                Map.of(
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        aggSum(100),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        aggSum(200)));
+        allMetrics.put(
+                source2,
+                Map.of(
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        aggSum(100),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        aggSum(50)));
+
+        assertEquals(
+                Map.of(
+                        new Edge(source1, op1), 2.,
+                        new Edge(source2, op1), 0.5,
+                        new Edge(source1, op2), 2.,
+                        new Edge(source2, op2), 0.5),
+                ScalingMetrics.computeOutputRatios(allMetrics, topology));
+    }
+
+    private static AggregatedMetric aggSum(double sum) {
+        return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 
sum);
+    }
 }

Reply via email to