This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 8933435e [FLINK-31318] Improve stability during backlog processing
8933435e is described below
commit 8933435ecc647438967e6f522056b8e560d7750b
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Mar 7 17:03:59 2023 +0100
[FLINK-31318] Improve stability during backlog processing
---
.../generated/auto_scaler_configuration.html | 6 ++
.../autoscaler/ScalingMetricEvaluator.java | 81 +++++++++++++++-----
.../autoscaler/config/AutoScalerOptions.java | 7 ++
.../operator/autoscaler/metrics/ScalingMetric.java | 3 +
.../autoscaler/metrics/ScalingMetrics.java | 2 +
.../autoscaler/BacklogBasedScalingTest.java | 1 -
.../operator/autoscaler/JobVertexScalerTest.java | 2 +-
.../MetricsCollectionAndEvaluationTest.java | 3 -
.../operator/autoscaler/ScalingExecutorTest.java | 2 +-
.../autoscaler/ScalingMetricEvaluatorTest.java | 88 +++++++++++++++++++++-
.../autoscaler/metrics/ScalingMetricsTest.java | 24 ++++--
11 files changed, 187 insertions(+), 32 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 85d30e78..f21a16c8 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
+ <tr>
+
<td><h5>kubernetes.operator.job.autoscaler.backlog-processing.lag-threshold</h5></td>
+ <td style="word-wrap: break-word;">5 min</td>
+ <td>Duration</td>
+ <td>Lag threshold which will prevent unnecessary scalings while
removing the pending messages responsible for the lag.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td>
<td style="word-wrap: break-word;">10 min</td>
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 36b11d49..6b61416b 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -26,22 +26,22 @@ import
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
import org.apache.commons.math3.stat.StatUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
+import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
@@ -58,8 +58,6 @@ public class ScalingMetricEvaluator {
private static final Logger LOG =
LoggerFactory.getLogger(ScalingMetricEvaluator.class);
- private Clock clock = Clock.systemDefaultZone();
-
public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluate(
Configuration conf, CollectedMetrics collectedMetrics) {
@@ -67,23 +65,58 @@ public class ScalingMetricEvaluator {
var metricsHistory = collectedMetrics.getMetricHistory();
var topology = collectedMetrics.getJobTopology();
+ boolean processingBacklog = isProcessingBacklog(topology,
metricsHistory, conf);
+
for (var vertex : topology.getVerticesInTopologicalOrder()) {
scalingOutput.put(
vertex,
computeVertexScalingSummary(
- conf, scalingOutput, metricsHistory, topology,
vertex));
+ conf,
+ scalingOutput,
+ metricsHistory,
+ topology,
+ vertex,
+ processingBacklog));
}
return scalingOutput;
}
+ @VisibleForTesting
+ protected static boolean isProcessingBacklog(
+ JobTopology topology,
+ SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory,
+ Configuration conf) {
+ var lastMetrics = metricsHistory.get(metricsHistory.lastKey());
+ return topology.getVerticesInTopologicalOrder().stream()
+ .filter(topology::isSource)
+ .anyMatch(
+ vertex -> {
+ double lag =
lastMetrics.get(vertex).getOrDefault(LAG, 0.0);
+ double avgProcRate =
+ getAverage(CURRENT_PROCESSING_RATE,
vertex, metricsHistory);
+ if (Double.isNaN(avgProcRate)) {
+ return false;
+ }
+ double lagSeconds = lag / avgProcRate;
+ if (lagSeconds
+ >
conf.get(BACKLOG_PROCESSING_LAG_THRESHOLD).toSeconds()) {
+ LOG.info("Currently processing backlog at
source {}", vertex);
+ return true;
+ } else {
+ return false;
+ }
+ });
+ }
+
@NotNull
private Map<ScalingMetric, EvaluatedScalingMetric>
computeVertexScalingSummary(
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
scalingOutput,
SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory,
JobTopology topology,
- JobVertexID vertex) {
+ JobVertexID vertex,
+ boolean processingBacklog) {
var latestVertexMetrics =
metricsHistory.get(metricsHistory.lastKey()).get(vertex);
@@ -109,7 +142,7 @@ public class ScalingMetricEvaluator {
MAX_PARALLELISM,
EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
- computeProcessingRateThresholds(evaluatedMetrics, conf);
+ computeProcessingRateThresholds(evaluatedMetrics, conf,
processingBacklog);
var isSink = topology.getOutputs().get(vertex).isEmpty();
if (!isSink) {
@@ -130,17 +163,31 @@ public class ScalingMetricEvaluator {
@VisibleForTesting
protected static void computeProcessingRateThresholds(
- Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration
conf) {
+ Map<ScalingMetric, EvaluatedScalingMetric> metrics,
+ Configuration conf,
+ boolean processingBacklog) {
double utilizationBoundary =
conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
+ double targetUtilization = conf.get(TARGET_UTILIZATION);
+
+ double upperUtilization;
+ double lowerUtilization;
+
+ if (processingBacklog) {
+ // When we are processing backlog we allow max utilization and we
do not trigger scale
+ // down on under utilization to avoid creating more lag.
+ upperUtilization = 1.0;
+ lowerUtilization = 0.0;
+ } else {
+ upperUtilization = targetUtilization + utilizationBoundary;
+ lowerUtilization = targetUtilization - utilizationBoundary;
+ }
double scaleUpThreshold =
- AutoScalerUtils.getTargetProcessingCapacity(
- metrics, conf, conf.get(TARGET_UTILIZATION) +
utilizationBoundary, false);
+ AutoScalerUtils.getTargetProcessingCapacity(metrics, conf,
upperUtilization, false);
double scaleDownThreshold =
- AutoScalerUtils.getTargetProcessingCapacity(
- metrics, conf, conf.get(TARGET_UTILIZATION) -
utilizationBoundary, true);
+ AutoScalerUtils.getTargetProcessingCapacity(metrics, conf,
lowerUtilization, true);
metrics.put(SCALE_UP_RATE_THRESHOLD,
EvaluatedScalingMetric.of(scaleUpThreshold));
metrics.put(SCALE_DOWN_RATE_THRESHOLD,
EvaluatedScalingMetric.of(scaleDownThreshold));
@@ -155,8 +202,7 @@ public class ScalingMetricEvaluator {
Map<ScalingMetric, Double> latestVertexMetrics,
Map<ScalingMetric, EvaluatedScalingMetric> out) {
- boolean isSource = topology.getInputs().get(vertex).isEmpty();
- if (isSource) {
+ if (topology.isSource(vertex)) {
double catchUpTargetSec =
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
var sourceRateMetric =
@@ -205,7 +251,7 @@ public class ScalingMetricEvaluator {
}
}
- private double getAverage(
+ private static double getAverage(
ScalingMetric metric,
JobVertexID jobVertexId,
SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>>
metricsHistory) {
@@ -217,9 +263,4 @@ public class ScalingMetricEvaluator {
.filter(d -> !Double.isNaN(d))
.toArray());
}
-
- @VisibleForTesting
- protected void setClock(Clock clock) {
- this.clock = Preconditions.checkNotNull(clock);
- }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 0d5e5435..64643433 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -121,6 +121,13 @@ public class AutoScalerOptions {
.withDescription(
"Expected restart time to be used until the
operator can determine it reliably from history.");
+ public static final ConfigOption<Duration>
BACKLOG_PROCESSING_LAG_THRESHOLD =
+ autoScalerConfig("backlog-processing.lag-threshold")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(5))
+ .withDescription(
+ "Lag threshold which will prevent unnecessary
scalings while removing the pending messages responsible for the lag.");
+
public static final ConfigOption<Boolean>
SCALING_EFFECTIVENESS_DETECTION_ENABLED =
autoScalerConfig("scaling.effectiveness.detection.enabled")
.booleanType()
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index aad95d39..1b543ed4 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -35,6 +35,9 @@ public enum ScalingMetric {
/** Output rate at full capacity (records/sec). */
TRUE_OUTPUT_RATE(true),
+ /** Current processing rate. */
+ CURRENT_PROCESSING_RATE(true),
+
/**
* Incoming data rate to the source, e.g. rate of records written to the
Kafka topic
* (records/sec).
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 7e99577e..03fb3ea6 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -96,6 +96,7 @@ public class ScalingMetrics {
targetDataRate =
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
}
+ scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
targetDataRate);
scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputPerSecond /
targetDataRate);
var trueOutputRate = busyTimeMultiplier * outputPerSecond;
@@ -128,6 +129,7 @@ public class ScalingMetrics {
trueProcessingRate = Double.NaN;
}
scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE,
trueProcessingRate);
+ scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
numRecordsInPerSecond);
} else {
LOG.error("Cannot compute true processing rate without
numRecordsInPerSecond");
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 26a1c614..8dba5636 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -337,7 +337,6 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
private void setClocksTo(Instant time) {
var clock = Clock.fixed(time, ZoneId.systemDefault());
metricsCollector.setClock(clock);
- evaluator.setClock(clock);
scalingExecutor.setClock(clock);
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 6e933d1c..d4c3d6a0 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -345,7 +345,7 @@ public class JobVertexScalerTest {
metrics.put(
ScalingMetric.TRUE_PROCESSING_RATE,
new EvaluatedScalingMetric(trueProcessingRate,
trueProcessingRate));
- ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+ ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf,
false);
return metrics;
}
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 93a3097b..6cc0d839 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -173,7 +173,6 @@ public class MetricsCollectionAndEvaluationTest {
collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo,
service, conf);
assertEquals(3, collectedMetrics.getMetricHistory().size());
- evaluator.setClock(clock);
var evaluation = evaluator.evaluate(conf, collectedMetrics);
scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
@@ -258,7 +257,6 @@ public class MetricsCollectionAndEvaluationTest {
var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)),
ZoneId.systemDefault());
metricsCollector.setClock(clock);
- evaluator.setClock(clock);
var collectedMetrics = metricsCollector.updateMetrics(app,
scalingInfo, service, conf);
@@ -267,7 +265,6 @@ public class MetricsCollectionAndEvaluationTest {
clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)),
ZoneId.systemDefault());
metricsCollector.setClock(clock);
- evaluator.setClock(clock);
metricsCollector.setMetricNames(
Map.of(
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 0bc1ecb7..cfe0695e 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -252,7 +252,7 @@ public class ScalingExecutorTest {
metrics.put(ScalingMetric.CATCH_UP_DATA_RATE,
EvaluatedScalingMetric.of(catchupRate));
metrics.put(
ScalingMetric.TRUE_PROCESSING_RATE, new
EvaluatedScalingMetric(procRate, procRate));
- ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+ ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf,
false);
return metrics;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
index 7a4af640..3a535763 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -42,6 +42,7 @@ import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
@@ -51,6 +52,8 @@ import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMet
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Scaling evaluator test. */
public class ScalingMetricEvaluatorTest {
@@ -210,16 +213,99 @@ public class ScalingMetricEvaluatorTest {
conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
assertEquals(Tuple2.of(1128.0, 1700.0), getThresholds(700, 350, conf));
assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
+
+ // Test thresholds during catchup periods
+ assertEquals(
+ Tuple2.of(1050., Double.POSITIVE_INFINITY), getThresholds(700,
350, conf, true));
+ assertEquals(Tuple2.of(700., Double.POSITIVE_INFINITY),
getThresholds(700, 0, conf, true));
+ }
+
+ @Test
+ public void testBacklogProcessingEvaluation() {
+ var source = new JobVertexID();
+ var sink = new JobVertexID();
+ var conf = new Configuration();
+
+ var topology =
+ new JobTopology(
+ new VertexInfo(source, Collections.emptySet(), 1, 1),
+ new VertexInfo(sink, Set.of(source), 1, 1));
+
+ var metricHistory = new TreeMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>();
+
+ // 0 lag
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(LAG, 0., CURRENT_PROCESSING_RATE, 100.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
+
+ // Missing lag
+ metricHistory.clear();
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(CURRENT_PROCESSING_RATE, 100.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
+
+ // Catch up time is more than a minute at avg proc rate (200)
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ LAG,
+ 250.
+ * conf.get(
+ AutoScalerOptions
+
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+ .toSeconds(),
+ CURRENT_PROCESSING_RATE,
+ 300.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ assertTrue(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
+
+ // Catch up time is less than a minute at avg proc rate (200)
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ LAG,
+ 180.
+ * conf.get(
+ AutoScalerOptions
+
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+ .toSeconds(),
+ CURRENT_PROCESSING_RATE,
+ 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+ assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
}
private Tuple2<Double, Double> getThresholds(
double inputTargetRate, double catchUpRate, Configuration conf) {
+ return getThresholds(inputTargetRate, catchUpRate, conf, false);
+ }
+
+ private Tuple2<Double, Double> getThresholds(
+ double inputTargetRate, double catchUpRate, Configuration conf,
boolean catchingUp) {
var map = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
map.put(TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN,
inputTargetRate));
map.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpRate));
- ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf);
+ ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf,
catchingUp);
return Tuple2.of(
map.get(SCALE_UP_RATE_THRESHOLD).getCurrent(),
map.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent());
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 9622d0ba..55567cbe 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -73,7 +73,9 @@ public class ScalingMetricsTest {
ScalingMetric.OUTPUT_RATIO,
2.,
ScalingMetric.SOURCE_DATA_RATE,
- 1015.),
+ 1015.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 1000.),
scalingMetrics);
// test negative lag growth (catch up)
@@ -101,7 +103,9 @@ public class ScalingMetricsTest {
ScalingMetric.OUTPUT_RATIO,
2.,
ScalingMetric.SOURCE_DATA_RATE,
- 950.),
+ 950.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 1000.),
scalingMetrics);
scalingMetrics.clear();
@@ -126,7 +130,9 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_OUTPUT_RATE,
20000.,
ScalingMetric.OUTPUT_RATIO,
- 2.),
+ 2.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 1000.),
scalingMetrics);
// Test using avg busyTime aggregator
@@ -154,7 +160,9 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_OUTPUT_RATE,
20000.,
ScalingMetric.OUTPUT_RATIO,
- 2.),
+ 2.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 1000.),
scalingMetrics);
}
@@ -217,7 +225,9 @@ public class ScalingMetricsTest {
ScalingMetric.OUTPUT_RATIO,
1.,
ScalingMetric.SOURCE_DATA_RATE,
- dataRate),
+ dataRate,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 10.),
scalingMetrics);
}
@@ -235,6 +245,8 @@ public class ScalingMetricsTest {
ScalingMetric.OUTPUT_RATIO,
1.,
ScalingMetric.SOURCE_DATA_RATE,
+ ScalingMetrics.EFFECTIVELY_ZERO,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
ScalingMetrics.EFFECTIVELY_ZERO),
scalingMetrics);
}
@@ -251,6 +263,8 @@ public class ScalingMetricsTest {
ScalingMetric.OUTPUT_RATIO,
1.,
ScalingMetric.SOURCE_DATA_RATE,
+ ScalingMetrics.EFFECTIVELY_ZERO,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
ScalingMetrics.EFFECTIVELY_ZERO),
scalingMetrics);
}