This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ad8b4abf [FLINK-31326] Consolidate source scaling logic (#546)
ad8b4abf is described below
commit ad8b4abf7334c7f196bd5a43f43e6181c9466574
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Mar 9 14:25:58 2023 +0100
[FLINK-31326] Consolidate source scaling logic (#546)
Previous changes removed the requirement for sources to have the
pendingRecords metric. This allows to refactor and consolidate the source
scaling code. We remove the option to disable scaling sources because the
primary motivation was to skip scaling legacy sources which we can achieve
without an extra configuration option.
The refactored source scaling logic works like this:
* Modern sources (busy time available)
Scale source based on the busy time up. Add extra capacity if the lag
information, provided by the pending records metric, is available. Otherwise
assume 0 lag. Cap by the maximum number of partitions, if available.
* Legacy sources (busy time unavailable)
Leave the source parallelism unchanged, use the current processing rate as
the
target rate downstream.
---
.../generated/auto_scaler_configuration.html | 6 -
examples/autoscaling/autoscaling.yaml | 1 -
.../autoscaler/ScalingMetricCollector.java | 57 +++----
.../autoscaler/ScalingMetricEvaluator.java | 10 +-
.../autoscaler/config/AutoScalerOptions.java | 11 +-
.../autoscaler/metrics/MetricAggregator.java | 9 +-
.../autoscaler/metrics/ScalingMetrics.java | 171 ++++++++++-----------
.../MetricsCollectionAndEvaluationTest.java | 2 +-
.../autoscaler/metrics/ScalingMetricsTest.java | 44 +++---
9 files changed, 147 insertions(+), 164 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index f21a16c8..11461049 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -86,12 +86,6 @@
<td>Boolean</td>
<td>Enable vertex scaling execution by the autoscaler. If
disabled, the autoscaler will only collect metrics and evaluate the suggested
parallelism for each vertex but will not upgrade the jobs.</td>
</tr>
- <tr>
-
<td><h5>kubernetes.operator.job.autoscaler.scaling.sources.enabled</h5></td>
- <td style="word-wrap: break-word;">true</td>
- <td>Boolean</td>
- <td>Whether to enable scaling source vertices. Source vertices set
the baseline ingestion rate for the processing based on the backlog size. If
disabled, only regular job vertices will be scaled and source vertices will be
unchanged.</td>
- </tr>
<tr>
<td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td>
<td style="word-wrap: break-word;">5 min</td>
diff --git a/examples/autoscaling/autoscaling.yaml
b/examples/autoscaling/autoscaling.yaml
index 509db5d2..4b332dd1 100644
--- a/examples/autoscaling/autoscaling.yaml
+++ b/examples/autoscaling/autoscaling.yaml
@@ -25,7 +25,6 @@ spec:
flinkVersion: v1_17
flinkConfiguration:
kubernetes.operator.job.autoscaler.enabled: "true"
- kubernetes.operator.job.autoscaler.scaling.sources.enabled: "false"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "3m"
pipeline.max-parallelism: "24"
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index c9265387..38868c6b 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -42,7 +42,6 @@ import org.apache.flink.util.Preconditions;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import lombok.SneakyThrows;
-import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +60,6 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
-
/** Metric collector using flink rest api. */
public abstract class ScalingMetricCollector {
private static final Logger LOG =
LoggerFactory.getLogger(ScalingMetricCollector.class);
@@ -249,7 +246,7 @@ public abstract class ScalingMetricCollector {
}
ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics,
vertexScalingMetrics);
- Optional<Double> lagGrowthRate =
+ double lagGrowthRate =
computeLagGrowthRate(
resourceID,
jobVertexID,
@@ -274,30 +271,29 @@ public abstract class ScalingMetricCollector {
return out;
}
- @NotNull
- private Optional<Double> computeLagGrowthRate(
+ private double computeLagGrowthRate(
ResourceID resourceID, JobVertexID jobVertexID, Double currentLag)
{
var metricHistory = histories.get(resourceID);
if (metricHistory == null || metricHistory.isEmpty()) {
- return Optional.empty();
+ return Double.NaN;
}
var lastCollectionTime = metricHistory.lastKey();
var lastCollectedMetrics =
metricHistory.get(lastCollectionTime).get(jobVertexID);
if (lastCollectedMetrics == null) {
- return Optional.empty();
+ return Double.NaN;
}
var lastLag = lastCollectedMetrics.get(ScalingMetric.LAG);
if (lastLag == null || currentLag == null) {
- return Optional.empty();
+ return Double.NaN;
}
var timeDiff = Duration.between(lastCollectionTime,
clock.instant()).toSeconds();
- return Optional.of((currentLag - lastLag) / timeDiff);
+ return (currentLag - lastLag) / timeDiff;
}
/** Query the available metric names for each job vertex for the current
spec generation. */
@@ -373,20 +369,25 @@ public abstract class ScalingMetricCollector {
if (topology.isSource(jobVertexID)) {
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
- if (conf.getBoolean(SOURCE_SCALING_ENABLED)) {
- requiredMetrics.add(FlinkMetric.PENDING_RECORDS);
- } else {
- FlinkMetric.PENDING_RECORDS
- .findAny(allMetricNames)
- .ifPresent(m -> filteredMetrics.put(m,
FlinkMetric.PENDING_RECORDS));
- FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
- .findAny(allMetricNames)
- .ifPresent(
- m ->
- filteredMetrics.put(
- m,
-
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
- }
+ // Pending records metric won't be available for some sources.
+ // The Kafka source, for instance, lazily initializes this metric
on receiving
+ // the first record. If this is a fresh topic or no new data has
been read since
+ // the last checkpoint, the pendingRecords metrics won't be
available. Also, legacy
+ // sources do not have this metric.
+ Optional<String> pendingRecordsMetric =
+ FlinkMetric.PENDING_RECORDS.findAny(allMetricNames);
+ pendingRecordsMetric.ifPresentOrElse(
+ m -> filteredMetrics.put(m, FlinkMetric.PENDING_RECORDS),
+ () ->
+ LOG.warn(
+ "pendingRecords metric for {} could not be
found. Either a legacy source or an idle source. Assuming no pending records.",
+ jobVertexID));
+ FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC
+ .findAny(allMetricNames)
+ .ifPresent(
+ m ->
+ filteredMetrics.put(
+ m,
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC));
} else {
// Not a source so we must have numRecordsInPerSecond
requiredMetrics.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
@@ -402,14 +403,6 @@ public abstract class ScalingMetricCollector {
if (flinkMetricName.isPresent()) {
// Add actual Flink metric name to list
filteredMetrics.put(flinkMetricName.get(), flinkMetric);
- } else if (flinkMetric == FlinkMetric.PENDING_RECORDS) {
- // Pending records metric won't be available for some sources.
- // The Kafka source, for instance, lazily initializes this
metric on receiving
- // the first record. If this is a fresh topic or no new data
has been read since
- // the last checkpoint, the pendingRecords metrics won't be
available.
- LOG.warn(
- "pendingRecords metric for {} could not be found. This
usually means the source hasn't read data. Assuming 0 pending records.",
- jobVertexID);
} else {
throw new RuntimeException(
"Could not find required metric "
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 7110fd5e..1116c0a5 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -205,11 +205,7 @@ public class ScalingMetricEvaluator {
if (topology.isSource(vertex)) {
double catchUpTargetSec =
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
- var sourceRateMetric =
- latestVertexMetrics.containsKey(TARGET_DATA_RATE)
- ? TARGET_DATA_RATE
- : SOURCE_DATA_RATE;
- if (!latestVertexMetrics.containsKey(sourceRateMetric)) {
+ if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
throw new RuntimeException(
"Cannot evaluate metrics without source target rate
information");
}
@@ -217,8 +213,8 @@ public class ScalingMetricEvaluator {
out.put(
TARGET_DATA_RATE,
new EvaluatedScalingMetric(
- latestVertexMetrics.get(sourceRateMetric),
- getAverage(sourceRateMetric, vertex,
metricsHistory)));
+ latestVertexMetrics.get(SOURCE_DATA_RATE),
+ getAverage(SOURCE_DATA_RATE, vertex,
metricsHistory)));
double lag = latestVertexMetrics.getOrDefault(LAG, 0.);
double catchUpInputRate = catchUpTargetSec == 0 ? 0 : lag /
catchUpTargetSec;
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 64643433..4d496f02 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -59,15 +59,6 @@ public class AutoScalerOptions {
.withDescription(
"Stabilization period in which no new scaling will
be executed");
- public static final ConfigOption<Boolean> SOURCE_SCALING_ENABLED =
- autoScalerConfig("scaling.sources.enabled")
- .booleanType()
- .defaultValue(true)
- .withDescription(
- "Whether to enable scaling source vertices. "
- + "Source vertices set the baseline
ingestion rate for the processing based on the backlog size. "
- + "If disabled, only regular job vertices
will be scaled and source vertices will be unchanged.");
-
public static final ConfigOption<Double> TARGET_UTILIZATION =
autoScalerConfig("target.utilization")
.doubleType()
@@ -166,7 +157,7 @@ public class AutoScalerOptions {
autoScalerConfig("vertex.exclude.ids")
.stringType()
.asList()
- .defaultValues(new String[0])
+ .defaultValues()
.withDescription(
"A (semicolon-separated) list of vertex ids in
hexstring for which to disable scaling. Caution: For non-sink vertices this
will still scale their downstream operators until
https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
index 07765fe8..c79d4b7d 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
@@ -19,7 +19,6 @@ package
org.apache.flink.kubernetes.operator.autoscaler.metrics;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
-import java.util.Optional;
import java.util.function.Function;
/** Enum specifying which aggregator to use when getting a metric value. */
@@ -34,7 +33,11 @@ public enum MetricAggregator {
this.getter = getter;
}
- public Optional<Double> get(AggregatedMetric metric) {
- return Optional.ofNullable(metric).map(getter).filter(d -> !d.isNaN());
+ public double get(AggregatedMetric metric) {
+ if (metric != null) {
+ return getter.apply(metric);
+ } else {
+ return Double.NaN;
+ }
}
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 05a947d9..bf508c71 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -27,10 +27,10 @@ import org.apache.commons.math3.util.Precision;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Map;
-import java.util.Optional;
-
-import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SOURCE_SCALING_ENABLED;
+import java.util.Set;
/** Utilities for computing scaling metrics based on Flink metrics. */
public class ScalingMetrics {
@@ -66,85 +66,47 @@ public class ScalingMetrics {
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
Map<ScalingMetric, Double> scalingMetrics,
JobTopology topology,
- Optional<Double> lagGrowthOpt,
+ double lagGrowthRate,
Configuration conf) {
- var source = topology.getInputs().get(jobVertexID).isEmpty();
- var sink = topology.getOutputs().get(jobVertexID).isEmpty();
-
- var busyTimeAggregator =
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
- var busyTimeOpt =
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
-
- if (busyTimeOpt.isEmpty()) {
- LOG.error("Cannot compute true processing/output rate without
busyTimeMsPerSecond");
- return;
- }
+ var isSource = topology.getInputs().get(jobVertexID).isEmpty();
+ var isSink = topology.getOutputs().get(jobVertexID).isEmpty();
- Double numRecordsInPerSecond = getNumRecordsInPerSecond(flinkMetrics,
jobVertexID);
- Double outputPerSecond = getNumRecordsOutPerSecond(flinkMetrics,
jobVertexID, sink);
+ double busyTime = getBusyTime(flinkMetrics, conf, jobVertexID);
+ double busyTimeMultiplier = 1000 / keepAboveZero(busyTime);
- double busyTimeMultiplier = 1000 / keepAboveZero(busyTimeOpt.get());
+ double numRecordsInPerSecond =
+ getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
+ double outputPerSecond =
+ getNumRecordsOutPerSecond(flinkMetrics, jobVertexID, isSource,
isSink);
- if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
- double sourceInputRate = numRecordsInPerSecond;
+ if (isSource) {
+ double sourceDataRate = Math.max(0, numRecordsInPerSecond +
lagGrowthRate);
+ LOG.info("Using computed source data rate {} for {}",
sourceDataRate, jobVertexID);
+ scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE, sourceDataRate);
+ scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
numRecordsInPerSecond);
+ }
- double targetDataRate;
- if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) {
- targetDataRate = sourceInputRate;
- } else {
- // If source in metric is not available (maybe legacy source)
we use source
- // output that should always be available
- targetDataRate =
-
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
+ if (!Double.isNaN(numRecordsInPerSecond)) {
+ double trueProcessingRate = busyTimeMultiplier *
numRecordsInPerSecond;
+ if (trueProcessingRate <= 0 ||
!Double.isFinite(trueProcessingRate)) {
+ trueProcessingRate = Double.NaN;
}
- scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
targetDataRate);
- scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
- scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputPerSecond /
targetDataRate);
- var trueOutputRate = busyTimeMultiplier * outputPerSecond;
- scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate);
- scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate);
- LOG.info(
- "Scaling disabled for source {} using output rate {} as
target",
- jobVertexID,
- trueOutputRate);
+ scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE,
trueProcessingRate);
+ scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
numRecordsInPerSecond);
} else {
- if (source) {
- if (!lagGrowthOpt.isPresent() ||
numRecordsInPerSecond.isNaN()) {
- LOG.error(
- "Cannot compute source target data rate without
numRecordsInPerSecond and pendingRecords (lag) metric for {}.",
- jobVertexID);
- scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE,
Double.NaN);
- } else {
- double sourceDataRate = Math.max(0, numRecordsInPerSecond
+ lagGrowthOpt.get());
- LOG.info(
- "Using computed source data rate {} for {}",
- sourceDataRate,
- jobVertexID);
- scalingMetrics.put(ScalingMetric.SOURCE_DATA_RATE,
sourceDataRate);
- }
- }
+ LOG.error("Cannot compute true processing rate without
numRecordsInPerSecond");
+ }
- if (!numRecordsInPerSecond.isNaN()) {
- double trueProcessingRate = busyTimeMultiplier *
numRecordsInPerSecond;
- if (trueProcessingRate <= 0 ||
!Double.isFinite(trueProcessingRate)) {
- trueProcessingRate = Double.NaN;
- }
- scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE,
trueProcessingRate);
- scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
numRecordsInPerSecond);
+ if (!isSink) {
+ if (!Double.isNaN(outputPerSecond)) {
+ scalingMetrics.put(
+ ScalingMetric.OUTPUT_RATIO, outputPerSecond /
numRecordsInPerSecond);
+ scalingMetrics.put(
+ ScalingMetric.TRUE_OUTPUT_RATE, busyTimeMultiplier *
outputPerSecond);
} else {
- LOG.error("Cannot compute true processing rate without
numRecordsInPerSecond");
- }
-
- if (!sink) {
- if (!outputPerSecond.isNaN()) {
- scalingMetrics.put(
- ScalingMetric.OUTPUT_RATIO, outputPerSecond /
numRecordsInPerSecond);
- scalingMetrics.put(
- ScalingMetric.TRUE_OUTPUT_RATE, busyTimeMultiplier
* outputPerSecond);
- } else {
- LOG.error(
- "Cannot compute processing and input rate without
numRecordsOutPerSecond");
- }
+ LOG.error(
+ "Cannot compute processing and input rate without
numRecordsOutPerSecond");
}
}
}
@@ -160,13 +122,36 @@ public class ScalingMetrics {
}
}
- private static Double getNumRecordsInPerSecond(
- Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID
jobVertexID) {
+ private static double getBusyTime(
+ Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+ Configuration conf,
+ JobVertexID jobVertexId) {
+ var busyTimeAggregator =
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+ var busyTime =
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
+ if (!Double.isFinite(busyTime)) {
+ LOG.error(
+ "No busyTimeMsPerSecond metric available for {}. No
scaling will be performed for this vertex.",
+ jobVertexId);
+ excludeVertexFromScaling(conf, jobVertexId);
+ // Pretend that the load is balanced because we don't know any
better
+ return conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
+ }
+ return busyTime;
+ }
+
+ private static double getNumRecordsInPerSecond(
+ Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+ JobVertexID jobVertexID,
+ boolean isSource) {
var numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
- if (numRecordsInPerSecond == null) {
+ if (isSource && (numRecordsInPerSecond == null ||
numRecordsInPerSecond.getSum() == 0)) {
numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
}
+ if (isSource && (numRecordsInPerSecond == null ||
numRecordsInPerSecond.getSum() == 0)) {
+ numRecordsInPerSecond =
+
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
+ }
if (numRecordsInPerSecond == null) {
LOG.warn("Received null input rate for {}. Returning NaN.",
jobVertexID);
return Double.NaN;
@@ -174,26 +159,40 @@ public class ScalingMetrics {
return keepAboveZero(numRecordsInPerSecond.getSum());
}
- private static Double getNumRecordsOutPerSecond(
+ private static double getNumRecordsOutPerSecond(
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
JobVertexID jobVertexID,
+ boolean isSource,
boolean isSink) {
- AggregatedMetric aggregatedMetric =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
- if (aggregatedMetric == null) {
- if (!isSink) {
- LOG.warn("Received null output rate for {}. Returning NaN.",
jobVertexID);
+ AggregatedMetric numRecordsOutPerSecond =
+ flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+ if (numRecordsOutPerSecond == null) {
+ if (isSource) {
+ numRecordsOutPerSecond =
+
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
+ }
+ if (numRecordsOutPerSecond == null) {
+ if (!isSink) {
+ LOG.warn("Received null output rate for {}. Returning
NaN.", jobVertexID);
+ }
+ return Double.NaN;
}
- return Double.NaN;
}
- return keepAboveZero(aggregatedMetric.getSum());
+ return keepAboveZero(numRecordsOutPerSecond.getSum());
}
- private static Double keepAboveZero(Double number) {
- if (number <= 0) {
- // Make busy time really tiny but not zero
+ private static double keepAboveZero(double value) {
+ if (value <= 0) {
+ // Make value tiny but not zero
return EFFECTIVELY_ZERO;
}
- return number;
+ return value;
+ }
+
+ private static void excludeVertexFromScaling(Configuration conf,
JobVertexID jobVertexId) {
+ Set<String> excludedIds = new
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
+ excludedIds.add(jobVertexId.toHexString());
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new
ArrayList<>(excludedIds));
}
public static double roundMetric(double value) {
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 6cc0d839..9f93d5d5 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -409,7 +409,7 @@ public class MetricsCollectionAndEvaluationTest {
new AggregatedMetric("", Double.NaN, 0.1,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 0.),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 0.))));
var collectedMetrics = collectMetrics();
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 55567cbe..62bc5a51 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -29,10 +29,10 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for scaling metrics computation logic. */
public class ScalingMetricsTest {
@@ -61,7 +61,7 @@ public class ScalingMetricsTest {
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
scalingMetrics,
topology,
- Optional.of(15.),
+ 15.,
new Configuration());
assertEquals(
@@ -91,7 +91,7 @@ public class ScalingMetricsTest {
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
scalingMetrics,
topology,
- Optional.of(-50.),
+ -50.,
new Configuration());
assertEquals(
@@ -120,7 +120,7 @@ public class ScalingMetricsTest {
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
scalingMetrics,
topology,
- Optional.empty(),
+ 0.,
new Configuration());
assertEquals(
@@ -150,7 +150,7 @@ public class ScalingMetricsTest {
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
scalingMetrics,
topology,
- Optional.empty(),
+ 0.,
conf);
assertEquals(
@@ -167,35 +167,43 @@ public class ScalingMetricsTest {
}
@Test
- public void testSourceScalingDisabled() {
+ public void testLegacySourceScaling() {
var source = new JobVertexID();
+ var sink = new JobVertexID();
- var topology = new JobTopology(new VertexInfo(source,
Collections.emptySet(), 1, 1));
+ var topology =
+ new JobTopology(
+ new VertexInfo(source, Collections.emptySet(), 5, 1),
+ new VertexInfo(sink, Collections.singleton(source),
10, 100));
Configuration conf = new Configuration();
- // Disable scaling sources
- conf.setBoolean(AutoScalerOptions.SOURCE_SCALING_ENABLED, false);
Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
ScalingMetrics.computeDataRateMetrics(
source,
Map.of(
+ // Busy time is NaN for legacy sources
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 500., Double.NaN,
Double.NaN),
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, Double.NaN),
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 4000.)),
scalingMetrics,
topology,
- Optional.empty(),
+ 0.,
conf);
- // Sources are not scaled, the rates are solely computed on the basis
of the true output
- // rate
- assertEquals(Double.NaN,
scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
- assertEquals(8000, scalingMetrics.get(ScalingMetric.TARGET_DATA_RATE));
- assertEquals(8000, scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
+ // Make sure vertex won't be scaled
+
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(source.toHexString()));
+ // Legacy source rates are computed based on the current rate and a
balanced utilization
+ assertEquals(
+ 2000 / conf.get(AutoScalerOptions.TARGET_UTILIZATION),
+ scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
+ assertEquals(2000, scalingMetrics.get(ScalingMetric.SOURCE_DATA_RATE));
+ assertEquals(
+ scalingMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE) * 2,
+ scalingMetrics.get(ScalingMetric.TRUE_OUTPUT_RATE));
assertEquals(2, scalingMetrics.get(ScalingMetric.OUTPUT_RATIO));
}
@@ -287,13 +295,13 @@ public class ScalingMetricsTest {
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
new AggregatedMetric("", Double.NaN, busyness,
Double.NaN, Double.NaN),
- FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, rate),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, rate)),
scalingMetrics,
topology,
- Optional.of(0.),
+ 0.,
new Configuration());
return scalingMetrics;