This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 51d7111c [FLINK-30575] Ensure zero rates lead to scale down (#543)
51d7111c is described below
commit 51d7111c03dfa9744638765869d917142d1cf401
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Mar 6 19:06:33 2023 +0100
[FLINK-30575] Ensure zero rates lead to scale down (#543)
The logic for computing true processing rates rely on recordsProcessed /
recordProcessingTime style computations which do not really work well when
everything is 0. This leads to no scaling actions when the load suddenly
drops
to 0.
The change here takes a pragmatic approach. Instead of special-casing zero
rates throughout the code, we set rates and busyness to near zero values.
This
keeps the code functional as-is while ensuring scale down happens.
---
.../autoscaler/metrics/ScalingMetrics.java | 78 +++++++++++++++------
.../MetricsCollectionAndEvaluationTest.java | 39 +++++++++++
.../autoscaler/metrics/ScalingMetricsTest.java | 81 ++++++++++++++++++++++
3 files changed, 175 insertions(+), 23 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index adadb365..7e99577e 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -36,6 +36,12 @@ public class ScalingMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(ScalingMetrics.class);
+ /**
+ * The minimum value to avoid using zero values which cause side effects
like division by zero
+ * or out of bounds (infinitive) floats.
+ */
+ public static final double EFFECTIVELY_ZERO = 1e-10;
+
public static void computeLoadMetrics(
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
Map<ScalingMetric, Double> scalingMetrics) {
@@ -73,19 +79,13 @@ public class ScalingMetrics {
return;
}
- var numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
- if (numRecordsInPerSecond == null) {
- numRecordsInPerSecond =
-
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
- }
-
- var outputPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+ Double numRecordsInPerSecond = getNumRecordsInPerSecond(flinkMetrics,
jobVertexID);
+ Double outputPerSecond = getNumRecordsOutPerSecond(flinkMetrics,
jobVertexID, sink);
- double busyTimeMultiplier = 1000 / busyTimeOpt.get();
+ double busyTimeMultiplier = 1000 / keepAboveZero(busyTimeOpt.get());
if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
- double sourceInputRate =
- numRecordsInPerSecond != null ?
numRecordsInPerSecond.getSum() : Double.NaN;
+ double sourceInputRate = numRecordsInPerSecond;
double targetDataRate;
if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) {
@@ -97,9 +97,8 @@ public class ScalingMetrics {
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
}
scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
- scalingMetrics.put(
- ScalingMetric.OUTPUT_RATIO, outputPerSecond.getSum() /
targetDataRate);
- var trueOutputRate = busyTimeMultiplier * outputPerSecond.getSum();
+ scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputPerSecond /
targetDataRate);
+ var trueOutputRate = busyTimeMultiplier * outputPerSecond;
scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate);
scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate);
LOG.info(
@@ -108,14 +107,13 @@ public class ScalingMetrics {
trueOutputRate);
} else {
if (source) {
- if (!lagGrowthOpt.isPresent() ||
numRecordsInPerSecond.getSum().isNaN()) {
+ if (!lagGrowthOpt.isPresent() ||
numRecordsInPerSecond.isNaN()) {
LOG.error(
"Cannot compute source target data rate without
numRecordsInPerSecond and pendingRecords (lag) metric for {}.",
jobVertexID);
scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE,
Double.NaN);
} else {
- double sourceDataRate =
- Math.max(0, numRecordsInPerSecond.getSum() +
lagGrowthOpt.get());
+ double sourceDataRate = Math.max(0, numRecordsInPerSecond
+ lagGrowthOpt.get());
LOG.info(
"Using computed source data rate {} for {}",
sourceDataRate,
@@ -124,8 +122,8 @@ public class ScalingMetrics {
}
}
- if (!numRecordsInPerSecond.getSum().isNaN()) {
- double trueProcessingRate = busyTimeMultiplier *
numRecordsInPerSecond.getSum();
+ if (!numRecordsInPerSecond.isNaN()) {
+ double trueProcessingRate = busyTimeMultiplier *
numRecordsInPerSecond;
if (trueProcessingRate <= 0 ||
!Double.isFinite(trueProcessingRate)) {
trueProcessingRate = Double.NaN;
}
@@ -135,13 +133,11 @@ public class ScalingMetrics {
}
if (!sink) {
- if (!outputPerSecond.getSum().isNaN()) {
+ if (!outputPerSecond.isNaN()) {
scalingMetrics.put(
- ScalingMetric.OUTPUT_RATIO,
- outputPerSecond.getSum() /
numRecordsInPerSecond.getSum());
+ ScalingMetric.OUTPUT_RATIO, outputPerSecond /
numRecordsInPerSecond);
scalingMetrics.put(
- ScalingMetric.TRUE_OUTPUT_RATE,
- busyTimeMultiplier * outputPerSecond.getSum());
+ ScalingMetric.TRUE_OUTPUT_RATE, busyTimeMultiplier
* outputPerSecond);
} else {
LOG.error(
"Cannot compute processing and input rate without
numRecordsOutPerSecond");
@@ -160,4 +156,40 @@ public class ScalingMetrics {
scalingMetrics.put(ScalingMetric.LAG, 0.);
}
}
+
+ private static Double getNumRecordsInPerSecond(
+ Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID
jobVertexID) {
+ var numRecordsInPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+ if (numRecordsInPerSecond == null) {
+ numRecordsInPerSecond =
+
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
+ }
+ if (numRecordsInPerSecond == null) {
+ LOG.warn("Received null input rate for {}. Returning NaN.",
jobVertexID);
+ return Double.NaN;
+ }
+ return keepAboveZero(numRecordsInPerSecond.getSum());
+ }
+
+ private static Double getNumRecordsOutPerSecond(
+ Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+ JobVertexID jobVertexID,
+ boolean isSink) {
+ AggregatedMetric aggregatedMetric =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+ if (aggregatedMetric == null) {
+ if (!isSink) {
+ LOG.warn("Received null output rate for {}. Returning NaN.",
jobVertexID);
+ }
+ return Double.NaN;
+ }
+ return keepAboveZero(aggregatedMetric.getSum());
+ }
+
+ private static Double keepAboveZero(Double number) {
+ if (number <= 0) {
+ // Make busy time really tiny but not zero
+ return EFFECTIVELY_ZERO;
+ }
+ return number;
+ }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index ac5724e1..93a3097b 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -28,6 +28,7 @@ import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -397,6 +398,44 @@ public class MetricsCollectionAndEvaluationTest {
assertEquals(1, scaledParallelism.get(source1));
}
+ @Test
+ public void testScaleDownWithZeroProcessingRate() throws Exception {
+ var topology = new JobTopology(new VertexInfo(source1, Set.of(), 10,
720));
+
+ metricsCollector = new TestingMetricsCollector(topology);
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ // Set source1 metrics without the PENDING_RECORDS
metric
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, 0.1,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 0.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 0.))));
+
+ var collectedMetrics = collectMetrics();
+
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluation =
+ evaluator.evaluate(conf, collectedMetrics);
+ assertEquals(
+ ScalingMetrics.EFFECTIVELY_ZERO,
+
evaluation.get(source1).get(ScalingMetric.TARGET_DATA_RATE).getCurrent());
+ assertEquals(
+ 1E-6,
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
+ assertEquals(
+ 0.,
+
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
+ assertEquals(
+ 0.,
+
evaluation.get(source1).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent());
+
+ scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+ var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(1, scaledParallelism.get(source1));
+ }
+
private CollectedMetrics collectMetrics() throws Exception {
conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index de349285..9622d0ba 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -203,4 +203,85 @@ public class ScalingMetricsTest {
assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX));
assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG));
}
+
+ @Test
+ public void testZeroValuesForBusyness() {
+ double dataRate = 10;
+ Map<ScalingMetric, Double> scalingMetrics =
testZeroValuesForRatesOrBusyness(dataRate, 0);
+ assertEquals(
+ Map.of(
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 1.0E14,
+ ScalingMetric.TRUE_OUTPUT_RATE,
+ 1.0E14,
+ ScalingMetric.OUTPUT_RATIO,
+ 1.,
+ ScalingMetric.SOURCE_DATA_RATE,
+ dataRate),
+ scalingMetrics);
+ }
+
+ @Test
+ public void testZeroValuesForRates() {
+ double busyMillisecondPerSec = 100;
+ Map<ScalingMetric, Double> scalingMetrics =
+ testZeroValuesForRatesOrBusyness(0, busyMillisecondPerSec);
+ assertEquals(
+ Map.of(
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 1.0E-9,
+ ScalingMetric.TRUE_OUTPUT_RATE,
+ 1.0E-9,
+ ScalingMetric.OUTPUT_RATIO,
+ 1.,
+ ScalingMetric.SOURCE_DATA_RATE,
+ ScalingMetrics.EFFECTIVELY_ZERO),
+ scalingMetrics);
+ }
+
+ @Test
+ public void testZeroValuesForRateAndBusyness() {
+ Map<ScalingMetric, Double> scalingMetrics =
testZeroValuesForRatesOrBusyness(0, 0);
+ assertEquals(
+ Map.of(
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 1000.0,
+ ScalingMetric.TRUE_OUTPUT_RATE,
+ 1000.0,
+ ScalingMetric.OUTPUT_RATIO,
+ 1.,
+ ScalingMetric.SOURCE_DATA_RATE,
+ ScalingMetrics.EFFECTIVELY_ZERO),
+ scalingMetrics);
+ }
+
+ private static Map<ScalingMetric, Double> testZeroValuesForRatesOrBusyness(
+ double rate, double busyness) {
+ var source = new JobVertexID();
+ var op = new JobVertexID();
+ var sink = new JobVertexID();
+
+ var topology =
+ new JobTopology(
+ new VertexInfo(source, Collections.emptySet(), 1, 1),
+ new VertexInfo(op, Set.of(source), 1, 1),
+ new VertexInfo(sink, Set.of(op), 1, 1));
+
+ Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+ ScalingMetrics.computeDataRateMetrics(
+ source,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, busyness,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, rate),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, rate)),
+ scalingMetrics,
+ topology,
+ Optional.of(0.),
+ new Configuration());
+
+ return scalingMetrics;
+ }
}