This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ff4bbd2a [FLINK-31827] Compute output ratio per edge
ff4bbd2a is described below
commit ff4bbd2a7bbed4ba0b1443d53731c883a230b6d4
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Apr 20 11:16:02 2023 +0200
[FLINK-31827] Compute output ratio per edge
---
.../operator/autoscaler/AutoScalerInfo.java | 23 +--
.../operator/autoscaler/JobVertexScaler.java | 6 +-
.../autoscaler/ScalingMetricCollector.java | 35 +++--
.../autoscaler/ScalingMetricEvaluator.java | 58 ++++----
...tedMetrics.java => CollectedMetricHistory.java} | 6 +-
.../autoscaler/metrics/CollectedMetrics.java | 17 +--
.../metrics/{CollectedMetrics.java => Edge.java} | 21 ++-
.../operator/autoscaler/metrics/ScalingMetric.java | 6 -
.../autoscaler/metrics/ScalingMetrics.java | 123 ++++++++++------
...SerDeModule.java => AutoScalerSerDeModule.java} | 25 +++-
.../operator/autoscaler/AutoScalerInfoTest.java | 19 ++-
.../MetricsCollectionAndEvaluationTest.java | 4 +-
.../autoscaler/ScalingMetricEvaluatorTest.java | 156 ++++++++++-----------
.../autoscaler/metrics/ScalingMetricsTest.java | 147 ++++++++++++-------
14 files changed, 387 insertions(+), 259 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index a5462219..68bfc731 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -21,12 +21,13 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
-import
org.apache.flink.kubernetes.operator.autoscaler.utils.JobVertexSerDeModule;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import
org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -71,7 +72,7 @@ public class AutoScalerInfo {
protected static final ObjectMapper YAML_MAPPER =
new ObjectMapper(yamlFactory())
.registerModule(new JavaTimeModule())
- .registerModule(new JobVertexSerDeModule());
+ .registerModule(new AutoScalerSerDeModule());
private final ConfigMap configMap;
private Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory;
@@ -86,20 +87,24 @@ public class AutoScalerInfo {
configMap.setData(Preconditions.checkNotNull(data));
}
- @SneakyThrows
- public SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
getMetricHistory() {
+ public SortedMap<Instant, CollectedMetrics> getMetricHistory() {
var historyYaml = configMap.getData().get(COLLECTED_METRICS_KEY);
if (historyYaml == null) {
return new TreeMap<>();
}
- return YAML_MAPPER.readValue(decompress(historyYaml), new
TypeReference<>() {});
+ try {
+ return YAML_MAPPER.readValue(decompress(historyYaml), new
TypeReference<>() {});
+ } catch (JsonProcessingException e) {
+ LOG.error(
+ "Could not deserialize metric history, possibly the format
changed. Discarding...");
+ return new TreeMap<>();
+ }
}
@SneakyThrows
public void updateMetricHistory(
- Instant jobUpdateTs,
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
history) {
+ Instant jobUpdateTs, SortedMap<Instant, CollectedMetrics> history)
{
configMap
.getData()
@@ -191,7 +196,7 @@ public class AutoScalerInfo {
int scalingHistorySize = data.getOrDefault(SCALING_HISTORY_KEY,
"").length();
int metricHistorySize = data.getOrDefault(COLLECTED_METRICS_KEY,
"").length();
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricHistory = null;
+ SortedMap<Instant, CollectedMetrics> metricHistory = null;
while (scalingHistorySize + metricHistorySize > MAX_CM_BYTES) {
if (metricHistory == null) {
metricHistory = getMetricHistory();
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index 31a0ed8d..f0d6a183 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -207,7 +207,11 @@ public class JobVertexScaler {
message);
if
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
- LOG.info(message);
+ LOG.info(
+ "Ineffective scaling detected for {}, expected increase
{}, actual {}",
+ vertex,
+ expectedIncrease,
+ actualIncrease);
return true;
} else {
return false;
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 38868c6b..c4493795 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
@@ -67,14 +68,14 @@ public abstract class ScalingMetricCollector {
private final Map<ResourceID, Tuple2<Long, Map<JobVertexID, Map<String,
FlinkMetric>>>>
availableVertexMetricNames = new ConcurrentHashMap<>();
- private final Map<ResourceID, SortedMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>>
- histories = new ConcurrentHashMap<>();
+ private final Map<ResourceID, SortedMap<Instant, CollectedMetrics>>
histories =
+ new ConcurrentHashMap<>();
private final Map<ResourceID, JobTopology> topologies = new
ConcurrentHashMap<>();
private Clock clock = Clock.systemDefaultZone();
- public CollectedMetrics updateMetrics(
+ public CollectedMetricHistory updateMetrics(
AbstractFlinkResource<?, ?> cr,
AutoScalerInfo scalingInformation,
FlinkService flinkService,
@@ -99,7 +100,7 @@ public abstract class ScalingMetricCollector {
if (now.isBefore(stableTime)) {
// As long as we are stabilizing, collect no metrics at all
LOG.info("Skipping metric collection during stabilization period
until {}", stableTime);
- return new CollectedMetrics(topology,
Collections.emptySortedMap());
+ return new CollectedMetricHistory(topology,
Collections.emptySortedMap());
}
// Adjust the window size until it reaches the max size
@@ -142,10 +143,10 @@ public abstract class ScalingMetricCollector {
LOG.info(
"Waiting until {} so the initial metric window is full
before starting scaling",
windowFullTime);
- return new CollectedMetrics(topology,
Collections.emptySortedMap());
+ return new CollectedMetricHistory(topology,
Collections.emptySortedMap());
}
- return new CollectedMetrics(topology, scalingMetricHistory);
+ return new CollectedMetricHistory(topology, scalingMetricHistory);
}
protected JobTopology getJobTopology(
@@ -225,7 +226,7 @@ public abstract class ScalingMetricCollector {
* @param collectedMetrics Collected metrics for all job vertices.
* @return Computed scaling metrics for all job vertices.
*/
- private Map<JobVertexID, Map<ScalingMetric, Double>>
convertToScalingMetrics(
+ private CollectedMetrics convertToScalingMetrics(
ResourceID resourceID,
Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>>
collectedMetrics,
JobTopology jobTopology,
@@ -268,7 +269,9 @@ public abstract class ScalingMetricCollector {
"Vertex scaling metrics for {}: {}", jobVertexID,
vertexScalingMetrics);
});
- return out;
+ var outputRatios =
ScalingMetrics.computeOutputRatios(collectedMetrics, jobTopology);
+ LOG.debug("Output ratios: {}", outputRatios);
+ return new CollectedMetrics(out, outputRatios);
}
private double computeLagGrowthRate(
@@ -280,7 +283,8 @@ public abstract class ScalingMetricCollector {
}
var lastCollectionTime = metricHistory.lastKey();
- var lastCollectedMetrics =
metricHistory.get(lastCollectionTime).get(jobVertexID);
+ var lastCollectedMetrics =
+
metricHistory.get(lastCollectionTime).getVertexMetrics().get(jobVertexID);
if (lastCollectedMetrics == null) {
return Double.NaN;
@@ -328,7 +332,7 @@ public abstract class ScalingMetricCollector {
v -> v,
v ->
getFilteredVertexMetricNames(
- restClient, jobId,
v, topology, conf)));
+ restClient, jobId,
v, topology)));
availableVertexMetricNames.put(
ResourceID.fromResource(cr), Tuple2.of(deployedGeneration,
names));
return names;
@@ -357,8 +361,7 @@ public abstract class ScalingMetricCollector {
RestClusterClient<?> restClient,
JobID jobID,
JobVertexID jobVertexID,
- JobTopology topology,
- Configuration conf) {
+ JobTopology topology) {
var allMetricNames = queryAggregatedMetricNames(restClient, jobID,
jobVertexID);
@@ -405,10 +408,7 @@ public abstract class ScalingMetricCollector {
filteredMetrics.put(flinkMetricName.get(), flinkMetric);
} else {
throw new RuntimeException(
- "Could not find required metric "
- + flinkMetric.name()
- + " for "
- + jobVertexID);
+ "Could not find required metric " + flinkMetric + "
for " + jobVertexID);
}
}
@@ -460,8 +460,7 @@ public abstract class ScalingMetricCollector {
}
@VisibleForTesting
- protected Map<ResourceID, SortedMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>>
- getHistories() {
+ protected Map<ResourceID, SortedMap<Instant, CollectedMetrics>>
getHistories() {
return histories;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index b167d6bb..5b4d3b3a 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
@@ -44,13 +46,11 @@ import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMet
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Job scaling evaluator for autoscaler. */
@@ -59,7 +59,7 @@ public class ScalingMetricEvaluator {
private static final Logger LOG =
LoggerFactory.getLogger(ScalingMetricEvaluator.class);
public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluate(
- Configuration conf, CollectedMetrics collectedMetrics) {
+ Configuration conf, CollectedMetricHistory collectedMetrics) {
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>();
var metricsHistory = collectedMetrics.getMetricHistory();
@@ -85,9 +85,9 @@ public class ScalingMetricEvaluator {
@VisibleForTesting
protected static boolean isProcessingBacklog(
JobTopology topology,
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory,
+ SortedMap<Instant, CollectedMetrics> metricsHistory,
Configuration conf) {
- var lastMetrics = metricsHistory.get(metricsHistory.lastKey());
+ var lastMetrics =
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics();
return topology.getVerticesInTopologicalOrder().stream()
.filter(topology::isSource)
.anyMatch(
@@ -113,12 +113,13 @@ public class ScalingMetricEvaluator {
private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
scalingOutput,
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory,
+ SortedMap<Instant, CollectedMetrics> metricsHistory,
JobTopology topology,
JobVertexID vertex,
boolean processingBacklog) {
- var latestVertexMetrics =
metricsHistory.get(metricsHistory.lastKey()).get(vertex);
+ var latestVertexMetrics =
+
metricsHistory.get(metricsHistory.lastKey()).getVertexMetrics().get(vertex);
var evaluatedMetrics = new HashMap<ScalingMetric,
EvaluatedScalingMetric>();
computeTargetDataRate(
@@ -143,21 +144,6 @@ public class ScalingMetricEvaluator {
EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
computeProcessingRateThresholds(evaluatedMetrics, conf,
processingBacklog);
-
- var isSink = topology.getOutputs().get(vertex).isEmpty();
- if (!isSink) {
- evaluatedMetrics.put(
- TRUE_OUTPUT_RATE,
- new EvaluatedScalingMetric(
- latestVertexMetrics.get(TRUE_OUTPUT_RATE),
- getAverage(TRUE_OUTPUT_RATE, vertex,
metricsHistory)));
- evaluatedMetrics.put(
- OUTPUT_RATIO,
- new EvaluatedScalingMetric(
- latestVertexMetrics.get(OUTPUT_RATIO),
- getAverage(OUTPUT_RATIO, vertex, metricsHistory)));
- }
-
return evaluatedMetrics;
}
@@ -198,7 +184,7 @@ public class ScalingMetricEvaluator {
JobVertexID vertex,
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
alreadyEvaluated,
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory,
+ SortedMap<Instant, CollectedMetrics> metricsHistory,
Map<ScalingMetric, Double> latestVertexMetrics,
Map<ScalingMetric, EvaluatedScalingMetric> out) {
@@ -233,7 +219,8 @@ public class ScalingMetricEvaluator {
for (var inputVertex : inputs) {
var inputEvaluatedMetrics = alreadyEvaluated.get(inputVertex);
var inputTargetRate =
inputEvaluatedMetrics.get(TARGET_DATA_RATE);
- var outputRateMultiplier =
inputEvaluatedMetrics.get(OUTPUT_RATIO).getAverage();
+ var outputRateMultiplier =
+ getAverageOutputRatio(new Edge(inputVertex, vertex),
metricsHistory);
sumCurrentTargetRate += inputTargetRate.getCurrent() *
outputRateMultiplier;
sumAvgTargetRate += inputTargetRate.getAverage() *
outputRateMultiplier;
sumCatchUpDataRate +=
@@ -250,10 +237,10 @@ public class ScalingMetricEvaluator {
private static double getAverage(
ScalingMetric metric,
JobVertexID jobVertexId,
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory) {
+ SortedMap<Instant, CollectedMetrics> metricsHistory) {
double[] metricValues =
metricsHistory.values().stream()
- .map(m -> m.get(jobVertexId))
+ .map(m -> m.getVertexMetrics().get(jobVertexId))
.filter(m -> m.containsKey(metric))
.mapToDouble(m -> m.get(metric))
.filter(d -> !Double.isNaN(d))
@@ -267,4 +254,23 @@ public class ScalingMetricEvaluator {
}
return StatUtils.mean(metricValues);
}
+
+ private static double getAverageOutputRatio(
+ Edge edge, SortedMap<Instant, CollectedMetrics> metricsHistory) {
+ double[] metricValues =
+ metricsHistory.values().stream()
+ .map(m -> m.getOutputRatios())
+ .filter(m -> m.containsKey(edge))
+ .mapToDouble(m -> m.get(edge))
+ .filter(d -> !Double.isNaN(d))
+ .toArray();
+ for (double metricValue : metricValues) {
+ if (Double.isInfinite(metricValue)) {
+ // As long as infinite values are present, we can't properly
average. We need to
+ // wait until they are evicted.
+ return metricValue;
+ }
+ }
+ return StatUtils.mean(metricValues);
+ }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
similarity index 85%
copy from
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
copy to
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
index 60f1c410..33243ec0 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java
@@ -18,17 +18,15 @@
package org.apache.flink.kubernetes.operator.autoscaler.metrics;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import lombok.Value;
import java.time.Instant;
-import java.util.Map;
import java.util.SortedMap;
/** Topology and collected metric history. */
@Value
-public class CollectedMetrics {
+public class CollectedMetricHistory {
JobTopology jobTopology;
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricHistory;
+ SortedMap<Instant, CollectedMetrics> metricHistory;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
index 60f1c410..97e3f9fe 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
@@ -17,18 +17,19 @@
package org.apache.flink.kubernetes.operator.autoscaler.metrics;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-import java.time.Instant;
import java.util.Map;
-import java.util.SortedMap;
-/** Topology and collected metric history. */
-@Value
+/** Collected scaling metrics. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
public class CollectedMetrics {
- JobTopology jobTopology;
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricHistory;
+ private Map<JobVertexID, Map<ScalingMetric, Double>> vertexMetrics;
+ private Map<Edge, Double> outputRatios;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
similarity index 71%
copy from
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
copy to
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
index 60f1c410..c89692a9 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java
@@ -17,18 +17,17 @@
package org.apache.flink.kubernetes.operator.autoscaler.metrics;
-import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import lombok.Value;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-import java.time.Instant;
-import java.util.Map;
-import java.util.SortedMap;
-
-/** Topology and collected metric history. */
-@Value
-public class CollectedMetrics {
- JobTopology jobTopology;
- SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricHistory;
+/** Collected scaling metrics. */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class Edge {
+ private JobVertexID from;
+ private JobVertexID to;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index 1b543ed4..e41816ce 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -32,9 +32,6 @@ public enum ScalingMetric {
/** Processing rate at full capacity (records/sec). */
TRUE_PROCESSING_RATE(true),
- /** Output rate at full capacity (records/sec). */
- TRUE_OUTPUT_RATE(true),
-
/** Current processing rate. */
CURRENT_PROCESSING_RATE(true),
@@ -50,9 +47,6 @@ public enum ScalingMetric {
/** Target processing rate of operators as derived from backlog
(records/sec). */
CATCH_UP_DATA_RATE(false),
- /** Number of outputs produced on average for every input record. */
- OUTPUT_RATIO(true),
-
/** Total number of pending records. */
LAG(false),
/** Job vertex parallelism. */
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 54fe7bb6..0e4638c6 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -63,8 +64,7 @@ public class ScalingMetrics {
double lagGrowthRate,
Configuration conf) {
- var isSource = topology.getInputs().get(jobVertexID).isEmpty();
- var isSink = topology.getOutputs().get(jobVertexID).isEmpty();
+ var isSource = topology.isSource(jobVertexID);
double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics,
conf, jobVertexID);
double numRecordsInPerSecond =
@@ -84,23 +84,39 @@ public class ScalingMetrics {
} else {
LOG.error("Cannot compute true processing rate without
numRecordsInPerSecond");
}
+ }
- if (!isSink) {
- double numRecordsOutPerSecond =
- getNumRecordsOutPerSecond(
- flinkMetrics, jobVertexID, isSource,
numRecordsInPerSecond);
- if (!Double.isNaN(numRecordsOutPerSecond)) {
- double outputRatio =
- computeOutputRatio(numRecordsInPerSecond,
numRecordsOutPerSecond);
- scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputRatio);
- scalingMetrics.put(
- ScalingMetric.TRUE_OUTPUT_RATE,
- computeTrueRate(numRecordsOutPerSecond,
busyTimeMsPerSecond));
- } else {
- LOG.error(
- "Cannot compute processing and input rate without
numRecordsOutPerSecond");
+ public static Map<Edge, Double> computeOutputRatios(
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
+ JobTopology topology) {
+
+ var out = new HashMap<Edge, Double>();
+ for (JobVertexID from : flinkMetrics.keySet()) {
+ var outputs = topology.getOutputs().get(from);
+ if (outputs.isEmpty()) {
+ continue;
+ }
+
+ double numRecordsInPerSecond =
+ getNumRecordsInPerSecond(flinkMetrics.get(from), from,
topology.isSource(from));
+
+ for (JobVertexID to : outputs) {
+ double outputRatio = 0;
+ // If the input rate is zero, we also need to flatten the
output rate.
+ // Otherwise, the OUTPUT_RATIO would be outrageously large,
leading to
+ // a rapid scale up.
+ if (numRecordsInPerSecond > 0) {
+ double edgeOutPerSecond =
+ computeEdgeOutPerSecond(topology, flinkMetrics,
from, to);
+ if (edgeOutPerSecond > 0) {
+ outputRatio = edgeOutPerSecond / numRecordsInPerSecond;
+ }
+ }
+ out.put(new Edge(from, to), outputRatio);
}
}
+
+ return out;
}
public static void computeLagMetrics(
@@ -153,37 +169,66 @@ public class ScalingMetrics {
}
private static double getNumRecordsOutPerSecond(
- Map<FlinkMetric, AggregatedMetric> flinkMetrics,
- JobVertexID jobVertexID,
- boolean isSource,
- double numRecordsInPerSecond) {
- if (numRecordsInPerSecond <= 0) {
- // If the input rate is zero, we also need to flatten the output
rate.
- // Otherwise, the OUTPUT_RATIO would be outrageously large,
leading to
- // a rapid scale up.
- return 0;
- }
+ Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID
jobVertexID) {
+
AggregatedMetric numRecordsOutPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+
if (numRecordsOutPerSecond == null) {
- if (isSource) {
- numRecordsOutPerSecond =
-
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
- }
- if (numRecordsOutPerSecond == null) {
- LOG.warn("Received null output rate for {}. Returning NaN.",
jobVertexID);
- return Double.NaN;
- }
+ LOG.warn("Received null output rate for {}. Returning NaN.",
jobVertexID);
+ return Double.NaN;
}
return numRecordsOutPerSecond.getSum();
}
- private static double computeOutputRatio(
- double numRecordsInPerSecond, double numRecordsOutPerSecond) {
- if (numRecordsInPerSecond <= 0) {
- return 0;
+ private static double computeEdgeOutPerSecond(
+ JobTopology topology,
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
+ JobVertexID from,
+ JobVertexID to) {
+ var toMetrics = flinkMetrics.get(to);
+
+ var toVertexInputs = topology.getInputs().get(to);
+ // Case 1: Downstream vertex has a single input (from) so we can use
the most reliable num
+ // records in
+ if (toVertexInputs.size() == 1) {
+ LOG.debug(
+ "Computing edge ({}, {}) data rate for single input
downstream task", from, to);
+ return getNumRecordsInPerSecond(toMetrics, to, false);
+ }
+
+ // Case 2: Downstream vertex has only inputs from upstream vertices
which don't have other
+ // outputs
+ double numRecordsOutFromUpstreamInputs = 0;
+ for (JobVertexID input : toVertexInputs) {
+ if (input.equals(from)) {
+ // Exclude source edge because we only want to consider other
input edges
+ continue;
+ }
+ if (topology.getOutputs().get(input).size() == 1) {
+ numRecordsOutFromUpstreamInputs +=
+ getNumRecordsOutPerSecond(flinkMetrics.get(input),
input);
+ } else {
+ // Output vertex has multiple outputs, cannot use this
information...
+ numRecordsOutFromUpstreamInputs = Double.NaN;
+ break;
+ }
+ }
+ if (!Double.isNaN(numRecordsOutFromUpstreamInputs)) {
+ LOG.debug(
+ "Computing edge ({}, {}) data rate by subtracting upstream
input rates",
+ from,
+ to);
+ return getNumRecordsInPerSecond(toMetrics, to, false) -
numRecordsOutFromUpstreamInputs;
}
- return numRecordsOutPerSecond / numRecordsInPerSecond;
+ var fromMetrics = flinkMetrics.get(from);
+
+ // Case 3: We fall back simply to num records out, this is the least
reliable
+ LOG.debug(
+ "Computing edge ({}, {}) data rate by falling back to from num
records out",
+ from,
+ to);
+ return getNumRecordsOutPerSecond(fromMetrics, from);
}
private static double computeTrueRate(double rate, double
busyTimeMsPerSecond) {
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
similarity index 68%
rename from
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
rename to
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
index 5a5195a0..fad516a5 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/JobVertexSerDeModule.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler.utils;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -29,11 +30,14 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import java.io.IOException;
/** Jackson serializer module for {@link JobVertexID}. */
-public class JobVertexSerDeModule extends SimpleModule {
+public class AutoScalerSerDeModule extends SimpleModule {
- public JobVertexSerDeModule() {
+ public AutoScalerSerDeModule() {
this.addKeySerializer(JobVertexID.class, new
JobVertexIdKeySerializer());
this.addKeyDeserializer(JobVertexID.class, new
JobVertexIdKeyDeserializer());
+
+ this.addKeySerializer(Edge.class, new EdgeKeySerializer());
+ this.addKeyDeserializer(Edge.class, new EdgeKeyDeserializer());
}
private static class JobVertexIdKeySerializer extends
JsonSerializer<JobVertexID> {
@@ -51,4 +55,21 @@ public class JobVertexSerDeModule extends SimpleModule {
return JobVertexID.fromHexString(s);
}
}
+
+ private static class EdgeKeySerializer extends JsonSerializer<Edge> {
+ @Override
+ public void serialize(Edge value, JsonGenerator jgen,
SerializerProvider provider)
+ throws IOException {
+
+ jgen.writeFieldName(value.getFrom().toHexString() + "," +
value.getTo().toHexString());
+ }
+ }
+
+ private static class EdgeKeyDeserializer extends KeyDeserializer {
+ @Override
+ public Object deserializeKey(String s, DeserializationContext
deserializationContext) {
+ var arr = s.split(",");
+ return new Edge(JobVertexID.fromHexString(arr[0]),
JobVertexID.fromHexString(arr[1]));
+ }
+ }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index cb262020..8f894643 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -28,6 +29,7 @@ import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -112,8 +114,11 @@ public class AutoScalerInfoTest {
var jobUpdateTs = Instant.now();
var v1 = new JobVertexID();
- var metricHistory = new TreeMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>();
- metricHistory.put(jobUpdateTs, Map.of(v1,
Map.of(ScalingMetric.TRUE_PROCESSING_RATE, 1.)));
+ var metricHistory = new TreeMap<Instant, CollectedMetrics>();
+ metricHistory.put(
+ jobUpdateTs,
+ new CollectedMetrics(
+ Map.of(v1, Map.of(ScalingMetric.TRUE_PROCESSING_RATE,
1.)), Map.of()));
var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant,
ScalingSummary>>();
scalingHistory.put(v1, new TreeMap<>());
@@ -153,7 +158,7 @@ public class AutoScalerInfoTest {
var v1 = new JobVertexID();
Random rnd = new Random();
- var metricHistory = new TreeMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>();
+ var metricHistory = new TreeMap<Instant, CollectedMetrics>();
for (int i = 0; i < 50; i++) {
var m = new HashMap<JobVertexID, Map<ScalingMetric, Double>>();
for (int j = 0; j < 500; j++) {
@@ -161,7 +166,7 @@ public class AutoScalerInfoTest {
new JobVertexID(),
Map.of(ScalingMetric.TRUE_PROCESSING_RATE,
rnd.nextDouble()));
}
- metricHistory.put(Instant.now(), m);
+ metricHistory.put(Instant.now(), new CollectedMetrics(m,
Collections.emptyMap()));
}
var scalingHistory = new HashMap<JobVertexID, SortedMap<Instant,
ScalingSummary>>();
@@ -197,4 +202,10 @@ public class AutoScalerInfoTest {
+
data.get(AutoScalerInfo.SCALING_HISTORY_KEY).length()
< AutoScalerInfo.MAX_CM_BYTES);
}
+
+ @Test
+ public void testDiscardInvalidHistory() {
+ var info = new
AutoScalerInfo(Map.of(AutoScalerInfo.COLLECTED_METRICS_KEY, "invalid"));
+ assertEquals(new TreeMap<>(), info.getMetricHistory());
+ }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index cd6fa660..eb7cf40f 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
-import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
@@ -431,7 +431,7 @@ public class MetricsCollectionAndEvaluationTest {
assertEquals(1, scaledParallelism.get(source1));
}
- private CollectedMetrics collectMetrics() throws Exception {
+ private CollectedMetricHistory collectMetrics() throws Exception {
conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
index 3a535763..e1bc3d65 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -20,7 +20,9 @@ package org.apache.flink.kubernetes.operator.autoscaler;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
@@ -44,12 +46,10 @@ import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -70,45 +70,37 @@ public class ScalingMetricEvaluatorTest {
var evaluator = new ScalingMetricEvaluator();
- var metricHistory = new TreeMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>();
+ var metricHistory = new TreeMap<Instant, CollectedMetrics>();
metricHistory.put(
Instant.now(),
- Map.of(
- source,
+ new CollectedMetrics(
Map.of(
- SOURCE_DATA_RATE,
- 100.,
- LAG,
- 0.,
- OUTPUT_RATIO,
- 2.,
- TRUE_OUTPUT_RATE,
- 200.,
- TRUE_PROCESSING_RATE,
- 200.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ source,
+ Map.of(SOURCE_DATA_RATE, 100., LAG, 0.,
TRUE_PROCESSING_RATE, 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Map.of(new Edge(source, sink), 2.)));
metricHistory.put(
Instant.now(),
- Map.of(
- source,
+ new CollectedMetrics(
Map.of(
- SOURCE_DATA_RATE, 200.,
- LAG, 1000.,
- OUTPUT_RATIO, 2.,
- TRUE_OUTPUT_RATE, 200.,
- TRUE_PROCESSING_RATE, 200.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ source,
+ Map.of(
+ SOURCE_DATA_RATE, 200.,
+ LAG, 1000.,
+ TRUE_PROCESSING_RATE, 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Map.of(new Edge(source, sink), 2.)));
var conf = new Configuration();
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
var evaluatedMetrics =
- evaluator.evaluate(conf, new CollectedMetrics(topology,
metricHistory));
+ evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
new EvaluatedScalingMetric(200, 150),
evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -123,7 +115,8 @@ public class ScalingMetricEvaluatorTest {
evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
- evaluatedMetrics = evaluator.evaluate(conf, new
CollectedMetrics(topology, metricHistory));
+ evaluatedMetrics =
+ evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
new EvaluatedScalingMetric(200, 150),
evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -140,7 +133,8 @@ public class ScalingMetricEvaluatorTest {
// Restart time should not affect evaluated metrics
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
- evaluatedMetrics = evaluator.evaluate(conf, new
CollectedMetrics(topology, metricHistory));
+ evaluatedMetrics =
+ evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
new EvaluatedScalingMetric(200, 150),
evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -156,7 +150,8 @@ public class ScalingMetricEvaluatorTest {
// Turn off lag based scaling
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
- evaluatedMetrics = evaluator.evaluate(conf, new
CollectedMetrics(topology, metricHistory));
+ evaluatedMetrics =
+ evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
new EvaluatedScalingMetric(200, 150),
evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -172,24 +167,17 @@ public class ScalingMetricEvaluatorTest {
metricHistory.clear();
metricHistory.put(
Instant.now(),
- Map.of(
- source,
+ new CollectedMetrics(
Map.of(
- SOURCE_DATA_RATE,
- 100.,
- LAG,
- 0.,
- OUTPUT_RATIO,
- 2.,
- TRUE_OUTPUT_RATE,
- 200.,
- TRUE_PROCESSING_RATE,
- 200.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ source,
+ Map.of(SOURCE_DATA_RATE, 100., LAG, 0.,
TRUE_PROCESSING_RATE, 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Map.of(new Edge(source, sink), 2.)));
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
- evaluatedMetrics = evaluator.evaluate(conf, new
CollectedMetrics(topology, metricHistory));
+ evaluatedMetrics =
+ evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
new EvaluatedScalingMetric(100, 100),
evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -231,65 +219,73 @@ public class ScalingMetricEvaluatorTest {
new VertexInfo(source, Collections.emptySet(), 1, 1),
new VertexInfo(sink, Set.of(source), 1, 1));
- var metricHistory = new TreeMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>();
+ var metricHistory = new TreeMap<Instant, CollectedMetrics>();
// 0 lag
metricHistory.put(
Instant.now(),
- Map.of(
- source,
- Map.of(LAG, 0., CURRENT_PROCESSING_RATE, 100.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ new CollectedMetrics(
+ Map.of(
+ source,
+ Map.of(LAG, 0., CURRENT_PROCESSING_RATE, 100.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Collections.emptyMap()));
assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
// Missing lag
metricHistory.clear();
metricHistory.put(
Instant.now(),
- Map.of(
- source,
- Map.of(CURRENT_PROCESSING_RATE, 100.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ new CollectedMetrics(
+ Map.of(
+ source,
+ Map.of(CURRENT_PROCESSING_RATE, 100.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Collections.emptyMap()));
assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
// Catch up time is more than a minute at avg proc rate (200)
metricHistory.put(
Instant.now(),
- Map.of(
- source,
+ new CollectedMetrics(
Map.of(
- LAG,
- 250.
- * conf.get(
- AutoScalerOptions
-
.BACKLOG_PROCESSING_LAG_THRESHOLD)
- .toSeconds(),
- CURRENT_PROCESSING_RATE,
- 300.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ source,
+ Map.of(
+ LAG,
+ 250.
+ * conf.get(
+
AutoScalerOptions
+
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+ .toSeconds(),
+ CURRENT_PROCESSING_RATE,
+ 300.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Collections.emptyMap()));
assertTrue(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
// Catch up time is less than a minute at avg proc rate (200)
metricHistory.put(
Instant.now(),
- Map.of(
- source,
+ new CollectedMetrics(
Map.of(
- LAG,
- 180.
- * conf.get(
- AutoScalerOptions
-
.BACKLOG_PROCESSING_LAG_THRESHOLD)
- .toSeconds(),
- CURRENT_PROCESSING_RATE,
- 200.),
- sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ source,
+ Map.of(
+ LAG,
+ 180.
+ * conf.get(
+
AutoScalerOptions
+
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+ .toSeconds(),
+ CURRENT_PROCESSING_RATE,
+ 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Collections.emptyMap()));
assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index c9aa95b2..4783bd98 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -57,9 +57,9 @@ public class ScalingMetricsTest {
FlinkMetric.BUSY_TIME_PER_SEC,
new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+ aggSum(1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
+ aggSum(2000.)),
scalingMetrics,
topology,
15.,
@@ -69,10 +69,6 @@ public class ScalingMetricsTest {
Map.of(
ScalingMetric.TRUE_PROCESSING_RATE,
10000.,
- ScalingMetric.TRUE_OUTPUT_RATE,
- 20000.,
- ScalingMetric.OUTPUT_RATIO,
- 2.,
ScalingMetric.SOURCE_DATA_RATE,
1015.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -87,9 +83,9 @@ public class ScalingMetricsTest {
FlinkMetric.BUSY_TIME_PER_SEC,
new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+ aggSum(1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
+ aggSum(2000.)),
scalingMetrics,
topology,
-50.,
@@ -99,10 +95,6 @@ public class ScalingMetricsTest {
Map.of(
ScalingMetric.TRUE_PROCESSING_RATE,
10000.,
- ScalingMetric.TRUE_OUTPUT_RATE,
- 20000.,
- ScalingMetric.OUTPUT_RATIO,
- 2.,
ScalingMetric.SOURCE_DATA_RATE,
950.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -116,9 +108,9 @@ public class ScalingMetricsTest {
FlinkMetric.BUSY_TIME_PER_SEC,
new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+ aggSum(1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
+ aggSum(2000.)),
scalingMetrics,
topology,
0.,
@@ -128,10 +120,6 @@ public class ScalingMetricsTest {
Map.of(
ScalingMetric.TRUE_PROCESSING_RATE,
10000.,
- ScalingMetric.TRUE_OUTPUT_RATE,
- 20000.,
- ScalingMetric.OUTPUT_RATIO,
- 2.,
ScalingMetric.CURRENT_PROCESSING_RATE,
1000.),
scalingMetrics);
@@ -146,9 +134,9 @@ public class ScalingMetricsTest {
FlinkMetric.BUSY_TIME_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN, 100.,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+ aggSum(1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
+ aggSum(2000.)),
scalingMetrics,
topology,
0.,
@@ -158,10 +146,6 @@ public class ScalingMetricsTest {
Map.of(
ScalingMetric.TRUE_PROCESSING_RATE,
10000.,
- ScalingMetric.TRUE_OUTPUT_RATE,
- 20000.,
- ScalingMetric.OUTPUT_RATIO,
- 2.,
ScalingMetric.CURRENT_PROCESSING_RATE,
1000.),
scalingMetrics);
@@ -187,11 +171,11 @@ public class ScalingMetricsTest {
Map.of(
// Busy time is NaN for legacy sources
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, Double.NaN),
+ aggSum(Double.NaN),
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.),
+ aggSum(2000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 4000.)),
+ aggSum(4000.)),
scalingMetrics,
topology,
0.,
@@ -206,10 +190,6 @@ public class ScalingMetricsTest {
2000 / conf.get(AutoScalerOptions.TARGET_UTILIZATION),
scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
assertEquals(2000, scalingMetrics.get(ScalingMetric.SOURCE_DATA_RATE));
- assertEquals(
- scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE) * 2,
- scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
- assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO));
}
@Test
@@ -235,10 +215,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// When not busy at all, we have infinite processing
power
Double.POSITIVE_INFINITY,
- ScalingMetric.TRUE_OUTPUT_RATE,
- Double.POSITIVE_INFINITY,
- ScalingMetric.OUTPUT_RATIO,
- 1.,
ScalingMetric.SOURCE_DATA_RATE,
dataRate,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -256,11 +232,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// When no records are coming in, we assume infinite
processing power
Double.POSITIVE_INFINITY,
- ScalingMetric.TRUE_OUTPUT_RATE,
- Double.POSITIVE_INFINITY,
- ScalingMetric.OUTPUT_RATIO,
- // We are not producing any records
- 0.,
ScalingMetric.SOURCE_DATA_RATE,
0.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -274,13 +245,9 @@ public class ScalingMetricsTest {
assertEquals(
Map.of(
// If there is zero input the out ratio must be zero
- ScalingMetric.OUTPUT_RATIO,
- 0.,
ScalingMetric.TRUE_PROCESSING_RATE,
// When no records are coming in, we assume infinite
processing power
Double.POSITIVE_INFINITY,
- ScalingMetric.TRUE_OUTPUT_RATE,
- Double.POSITIVE_INFINITY,
ScalingMetric.SOURCE_DATA_RATE,
0.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -296,10 +263,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// Nothing is coming in, we must assume infinite
processing power
Double.POSITIVE_INFINITY,
- ScalingMetric.TRUE_OUTPUT_RATE,
- Double.POSITIVE_INFINITY,
- ScalingMetric.OUTPUT_RATIO,
- 0.,
ScalingMetric.SOURCE_DATA_RATE,
0.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -329,7 +292,7 @@ public class ScalingMetricsTest {
new AggregatedMetric(
"", Double.NaN, Double.NaN, Double.NaN,
processingRate),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, outputRate)),
+ aggSum(outputRate)),
scalingMetrics,
topology,
0.,
@@ -337,4 +300,90 @@ public class ScalingMetricsTest {
return scalingMetrics;
}
+
+ @Test
+ public void testComputableOutputRatios() {
+ var source1 = new JobVertexID();
+ var source2 = new JobVertexID();
+
+ var op1 = new JobVertexID();
+ var sink1 = new JobVertexID();
+
+ var topology =
+ new JobTopology(
+ new VertexInfo(source1, Collections.emptySet(), 1, 1),
+ new VertexInfo(source2, Collections.emptySet(), 1, 1),
+ new VertexInfo(op1, Set.of(source1, source2), 1, 1),
+ new VertexInfo(sink1, Set.of(op1), 1, 1));
+
+ var allMetrics = new HashMap<JobVertexID, Map<FlinkMetric,
AggregatedMetric>>();
+ allMetrics.put(
+ source1,
+ Map.of(
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ aggSum(100),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ aggSum(200)));
+ allMetrics.put(
+ source2,
+ Map.of(
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ aggSum(100),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ aggSum(50)));
+
+ allMetrics.put(op1, Map.of(FlinkMetric.NUM_RECORDS_IN_PER_SEC,
aggSum(250)));
+ allMetrics.put(sink1, Map.of(FlinkMetric.NUM_RECORDS_IN_PER_SEC,
aggSum(50)));
+
+ assertEquals(
+ Map.of(
+ new Edge(source1, op1), 2.,
+ new Edge(source2, op1), 0.5,
+ new Edge(op1, sink1), 0.2),
+ ScalingMetrics.computeOutputRatios(allMetrics, topology));
+ }
+
+ @Test
+ public void testOutputRatioFallbackToOutPerSecond() {
+ var source1 = new JobVertexID();
+ var source2 = new JobVertexID();
+
+ var op1 = new JobVertexID();
+ var op2 = new JobVertexID();
+
+ var topology =
+ new JobTopology(
+ new VertexInfo(source1, Collections.emptySet(), 1, 1),
+ new VertexInfo(source2, Collections.emptySet(), 1, 1),
+ new VertexInfo(op1, Set.of(source1, source2), 1, 1),
+ new VertexInfo(op2, Set.of(source1, source2), 1, 1));
+
+ var allMetrics = new HashMap<JobVertexID, Map<FlinkMetric,
AggregatedMetric>>();
+ allMetrics.put(
+ source1,
+ Map.of(
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ aggSum(100),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ aggSum(200)));
+ allMetrics.put(
+ source2,
+ Map.of(
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ aggSum(100),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ aggSum(50)));
+
+ assertEquals(
+ Map.of(
+ new Edge(source1, op1), 2.,
+ new Edge(source2, op1), 0.5,
+ new Edge(source1, op2), 2.,
+ new Edge(source2, op2), 0.5),
+ ScalingMetrics.computeOutputRatios(allMetrics, topology));
+ }
+
+ private static AggregatedMetric aggSum(double sum) {
+ return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN,
sum);
+ }
}