This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new cc680e14 [FLINK-33306] Use observed source throughput as true
processing rate
cc680e14 is described below
commit cc680e142bb8d52c4db215658ee7f4c4159a0fe4
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Oct 24 17:28:02 2023 +0200
[FLINK-33306] Use observed source throughput as true processing rate
---
.../generated/auto_scaler_configuration.html | 18 ++
.../apache/flink/autoscaler/ScalingExecutor.java | 1 -
.../flink/autoscaler/ScalingMetricCollector.java | 130 +++++++++-----
.../flink/autoscaler/ScalingMetricEvaluator.java | 99 +++++++++--
.../flink/autoscaler/config/AutoScalerOptions.java | 30 ++++
.../autoscaler/event/AutoScalerEventHandler.java | 2 +-
.../flink/autoscaler/metrics/FlinkMetric.java | 3 +-
.../flink/autoscaler/metrics/ScalingMetric.java | 3 +
.../flink/autoscaler/metrics/ScalingMetrics.java | 71 +++++++-
.../flink/autoscaler/realizer/ScalingRealizer.java | 3 +-
.../autoscaler/state/AutoScalerStateStore.java | 7 +-
.../state/InMemoryAutoScalerStateStore.java | 22 +--
.../flink/autoscaler/BacklogBasedScalingTest.java | 6 +-
.../MetricsCollectionAndEvaluationTest.java | 194 ++++++++++++++++++++-
.../autoscaler/RecommendedParallelismTest.java | 4 +-
.../autoscaler/RestApiMetricsCollectorTest.java | 18 +-
.../autoscaler/ScalingMetricCollectorTest.java | 59 +++++--
.../autoscaler/ScalingMetricEvaluatorTest.java | 119 +++++++++++++
.../autoscaler/metrics/ScalingMetricsTest.java | 109 ++++++++++--
.../operator/autoscaler/AutoscalerFactory.java | 6 +-
.../operator/autoscaler/ConfigMapStore.java | 11 +-
.../autoscaler/KubernetesAutoScalerStateStore.java | 12 +-
.../KubernetesAutoScalerStateStoreTest.java | 10 +-
23 files changed, 773 insertions(+), 164 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 0b2d5b35..0221b567 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -56,6 +56,24 @@
<td>Duration</td>
<td>Scaling metrics aggregation window size.</td>
</tr>
+ <tr>
+
<td><h5>job.autoscaler.observed-true-processing-rate.lag-threshold</h5></td>
+ <td style="word-wrap: break-word;">30 s</td>
+ <td>Duration</td>
+ <td>Lag threshold for enabling observed true processing rate
measurements.</td>
+ </tr>
+ <tr>
+
<td><h5>job.autoscaler.observed-true-processing-rate.min-observations</h5></td>
+ <td style="word-wrap: break-word;">2</td>
+ <td>Integer</td>
+ <td>Minimum nr of observations used when estimating / switching to
observed true processing rate.</td>
+ </tr>
+ <tr>
+
<td><h5>job.autoscaler.observed-true-processing-rate.switch-threshold</h5></td>
+ <td style="word-wrap: break-word;">0.15</td>
+ <td>Double</td>
+ <td>Percentage threshold for switching to observed from busy time
based true processing rate if the measurement is off by at least the configured
fraction. For example 0.15 means we switch to observed if the busy time based
computation is at least 15% higher during catchup.</td>
+ </tr>
<tr>
<td><h5>job.autoscaler.restart.time</h5></td>
<td style="word-wrap: break-word;">3 min</td>
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index cedb4874..406b17d7 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -163,7 +163,6 @@ public class ScalingExecutor<KEY, Context extends
JobAutoScalerContext<KEY>> {
for (Map.Entry<JobVertexID, ScalingSummary> entry :
scalingSummaries.entrySet()) {
var vertex = entry.getKey();
- var scalingSummary = entry.getValue();
var metrics = evaluatedMetrics.get(vertex);
double processingRate =
metrics.get(TRUE_PROCESSING_RATE).getAverage();
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index 013fe14d..df5f8ad6 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -59,8 +59,10 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
import static
org.apache.flink.autoscaler.utils.AutoScalerUtils.excludeVerticesFromScaling;
@@ -90,39 +92,31 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
jobKey,
(k) -> {
try {
- return
stateStore.getEvaluatedMetrics(ctx).orElse(new TreeMap<>());
+ return
stateStore.getCollectedMetrics(ctx).orElse(new TreeMap<>());
} catch (Exception exception) {
throw new RuntimeException(
"Get evaluated metrics failed.",
exception);
}
});
- // The timestamp of the first metric observation marks the start
- // If we haven't collected any metrics, we are starting now
- var metricCollectionStartTs = metricHistory.isEmpty() ? now :
metricHistory.firstKey();
-
var jobDetailsInfo =
getJobDetailsInfo(ctx,
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
- if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
+ // We detect job change compared to our collected metrics by checking
against the earliest
+ // metric timestamp
+ if (!metricHistory.isEmpty() &&
jobUpdateTs.isAfter(metricHistory.firstKey())) {
LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
- stateStore.removeEvaluatedMetrics(ctx);
+ stateStore.removeCollectedMetrics(ctx);
cleanup(ctx.getJobKey());
metricHistory.clear();
- metricCollectionStartTs = now;
}
var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
+ var stableTime =
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
- // Trim metrics outside the metric window from metrics history
+ // Calculate timestamp when the metric windows is full
var metricWindowSize = getMetricWindowSize(conf);
- metricHistory.headMap(now.minus(metricWindowSize)).clear();
-
- var stableTime =
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
- if (now.isBefore(stableTime)) {
- // As long as we are stabilizing, collect no metrics at all
- LOG.info("Skipping metric collection during stabilization period
until {}", stableTime);
- return new CollectedMetricHistory(topology,
Collections.emptySortedMap());
- }
+ var windowFullTime =
+ getWindowFullTime(metricHistory.tailMap(stableTime), now,
metricWindowSize);
// The filtered list of metrics we want to query for each vertex
var filteredVertexMetricNames = queryFilteredMetricNames(ctx,
topology);
@@ -136,17 +130,22 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
// Add scaling metrics to history if they were computed successfully
metricHistory.put(now, scalingMetrics);
- stateStore.storeEvaluatedMetrics(ctx, metricHistory);
- var collectedMetrics = new CollectedMetricHistory(topology,
metricHistory);
-
- var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
- collectedMetrics.setFullyCollected(!now.isBefore(windowFullTime));
+ if (now.isBefore(stableTime)) {
+ LOG.info("Stabilizing until {}", stableTime);
+ stateStore.storeCollectedMetrics(ctx, metricHistory);
+ return new CollectedMetricHistory(topology,
Collections.emptySortedMap());
+ }
- if (!collectedMetrics.isFullyCollected()) {
+ var collectedMetrics = new CollectedMetricHistory(topology,
metricHistory);
+ if (now.isBefore(windowFullTime)) {
LOG.info("Metric window not full until {}", windowFullTime);
+ } else {
+ collectedMetrics.setFullyCollected(true);
+ // Trim metrics outside the metric window from metrics history
+ metricHistory.headMap(now.minus(metricWindowSize)).clear();
}
-
+ stateStore.storeCollectedMetrics(ctx, metricHistory);
return collectedMetrics;
}
@@ -154,6 +153,15 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
return conf.get(AutoScalerOptions.METRICS_WINDOW);
}
+ private static Instant getWindowFullTime(
+ SortedMap<Instant, CollectedMetrics> metricsAfterStable,
+ Instant now,
+ Duration metricWindowSize) {
+ return metricsAfterStable.isEmpty()
+ ? now.plus(metricWindowSize)
+ : metricsAfterStable.firstKey().plus(metricWindowSize);
+ }
+
@VisibleForTesting
protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) {
return Instant.ofEpochMilli(
@@ -265,9 +273,11 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
ScalingMetrics.computeLoadMetrics(
jobVertexID, vertexFlinkMetrics,
vertexScalingMetrics, conf);
+ var metricHistory =
+ histories.getOrDefault(jobKey,
Collections.emptySortedMap());
double lagGrowthRate =
computeLagGrowthRate(
- jobKey,
+ metricHistory,
jobVertexID,
vertexScalingMetrics.get(ScalingMetric.LAG));
@@ -277,8 +287,13 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
vertexScalingMetrics,
jobTopology,
lagGrowthRate,
- conf);
-
+ conf,
+ observedTprAvg(
+ jobVertexID,
+ metricHistory,
+ conf.get(
+ AutoScalerOptions
+
.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
vertexScalingMetrics
.entrySet()
.forEach(e ->
e.setValue(ScalingMetrics.roundMetric(e.getValue())));
@@ -292,10 +307,21 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
return new CollectedMetrics(out, outputRatios);
}
- private double computeLagGrowthRate(KEY jobKey, JobVertexID jobVertexID,
Double currentLag) {
- var metricHistory = histories.get(jobKey);
+ private static Supplier<Double> observedTprAvg(
+ JobVertexID jobVertexID,
+ SortedMap<Instant, CollectedMetrics> metricHistory,
+ int minObservations) {
+ return () ->
+ ScalingMetricEvaluator.getAverage(
+ ScalingMetric.OBSERVED_TPR, jobVertexID,
metricHistory, minObservations);
+ }
+
+ private double computeLagGrowthRate(
+ SortedMap<Instant, CollectedMetrics> metricHistory,
+ JobVertexID jobVertexID,
+ Double currentLag) {
- if (metricHistory == null || metricHistory.isEmpty()) {
+ if (metricHistory.isEmpty()) {
return Double.NaN;
}
@@ -332,30 +358,39 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
&& previousMetricNames
.keySet()
.equals(topology.getParallelisms().keySet())) {
- // We have already gathered the metric names
for this topology
- return previousMetricNames;
+ var newMetricNames = new
HashMap<>(previousMetricNames);
+ var sourceMetricNames =
+ queryFilteredMetricNames(
+ ctx,
+ topology,
+
vertices.stream().filter(topology::isSource));
+ newMetricNames.putAll(sourceMetricNames);
+ return newMetricNames;
}
- try (var restClient = ctx.getRestClusterClient()) {
- return vertices.stream()
- .filter(v ->
!topology.getFinishedVertices().contains(v))
- .collect(
- Collectors.toMap(
- v -> v,
- v ->
-
getFilteredVertexMetricNames(
-
restClient,
-
ctx.getJobID(),
- v,
-
topology)));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ // Query all metric names
+ return queryFilteredMetricNames(ctx, topology,
vertices.stream());
});
names.keySet().removeAll(topology.getFinishedVertices());
return names;
}
+ private Map<JobVertexID, Map<String, FlinkMetric>>
queryFilteredMetricNames(
+ Context ctx, JobTopology topology, Stream<JobVertexID>
vertexStream) {
+ try (var restClient = ctx.getRestClusterClient()) {
+ return vertexStream
+ .filter(v -> !topology.getFinishedVertices().contains(v))
+ .collect(
+ Collectors.toMap(
+ v -> v,
+ v ->
+ getFilteredVertexMetricNames(
+ restClient,
ctx.getJobID(), v, topology)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Query and filter metric names for a given job vertex.
*
@@ -378,6 +413,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);
if (topology.isSource(jobVertexID)) {
+ requiredMetrics.add(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
// Pending records metric won't be available for some sources.
// The Kafka source, for instance, lazily initializes this metric
on receiving
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 1aec2cf2..a97ac6f1 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -48,6 +48,7 @@ import static
org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSI
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
@@ -135,9 +136,7 @@ public class ScalingMetricEvaluator {
evaluatedMetrics.put(
TRUE_PROCESSING_RATE,
- new EvaluatedScalingMetric(
- latestVertexMetrics.get(TRUE_PROCESSING_RATE),
- getAverage(TRUE_PROCESSING_RATE, vertex,
metricsHistory)));
+ evaluateTpr(metricsHistory, vertex, latestVertexMetrics,
conf));
evaluatedMetrics.put(
LOAD,
@@ -154,6 +153,57 @@ public class ScalingMetricEvaluator {
return evaluatedMetrics;
}
+ private static EvaluatedScalingMetric evaluateTpr(
+ SortedMap<Instant, CollectedMetrics> metricsHistory,
+ JobVertexID vertex,
+ Map<ScalingMetric, Double> latestVertexMetrics,
+ Configuration conf) {
+
+ var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex,
metricsHistory);
+ var observedTprAvg =
+ getAverage(
+ OBSERVED_TPR,
+ vertex,
+ metricsHistory,
+
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS));
+
+ var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg,
observedTprAvg);
+ return new EvaluatedScalingMetric(
+ latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+ tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+ }
+
+ private static ScalingMetric selectTprMetric(
+ JobVertexID jobVertexID,
+ Configuration conf,
+ double busyTimeTprAvg,
+ double observedTprAvg) {
+
+ if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg))
{
+ return OBSERVED_TPR;
+ }
+
+ if (Double.isNaN(observedTprAvg)) {
+ return TRUE_PROCESSING_RATE;
+ }
+
+ double switchThreshold =
+
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD);
+ // If we could measure the observed tpr we decide whether to switch to
using it
+ // instead of busy time based on the error / difference between the two
+ if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {
+ LOG.debug(
+ "Using observed tpr {} for {} as busy time based seems too
large ({})",
+ observedTprAvg,
+ jobVertexID,
+ busyTimeTprAvg);
+ return OBSERVED_TPR;
+ } else {
+ LOG.debug("Using busy time based tpr {} for {}.", busyTimeTprAvg,
jobVertexID);
+ return TRUE_PROCESSING_RATE;
+ }
+ }
+
@VisibleForTesting
protected static void computeProcessingRateThresholds(
Map<ScalingMetric, EvaluatedScalingMetric> metrics,
@@ -241,25 +291,40 @@ public class ScalingMetricEvaluator {
}
}
- private static double getAverage(
+ public static double getAverage(
ScalingMetric metric,
JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory) {
- double[] metricValues =
- metricsHistory.values().stream()
- .map(m -> m.getVertexMetrics().get(jobVertexId))
- .filter(m -> m.containsKey(metric))
- .mapToDouble(m -> m.get(metric))
- .filter(d -> !Double.isNaN(d))
- .toArray();
- for (double metricValue : metricValues) {
- if (Double.isInfinite(metricValue)) {
- // As long as infinite values are present, we can't properly
average. We need to
- // wait until they are evicted.
- return metricValue;
+ return getAverage(metric, jobVertexId, metricsHistory, 1);
+ }
+
+ public static double getAverage(
+ ScalingMetric metric,
+ JobVertexID jobVertexId,
+ SortedMap<Instant, CollectedMetrics> metricsHistory,
+ int minElements) {
+
+ double sum = 0;
+ int n = 0;
+ boolean anyInfinite = false;
+ for (var collectedMetrics : metricsHistory.values()) {
+ var metrics = collectedMetrics.getVertexMetrics().get(jobVertexId);
+ double num = metrics.getOrDefault(metric, Double.NaN);
+ if (Double.isNaN(num)) {
+ continue;
}
+ if (Double.isInfinite(num)) {
+ anyInfinite = true;
+ continue;
+ }
+
+ sum += num;
+ n++;
}
- return StatUtils.mean(metricValues);
+ if (n == 0) {
+ return anyInfinite ? Double.POSITIVE_INFINITY : Double.NaN;
+ }
+ return n < minElements ? Double.NaN : sum / n;
}
private static double getAverageOutputRatio(
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 6c74b5d2..626921bc 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -151,6 +151,36 @@ public class AutoScalerOptions {
.withDescription(
"Lag threshold which will prevent unnecessary
scalings while removing the pending messages responsible for the lag.");
+ public static final ConfigOption<Duration>
OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD =
+ autoScalerConfig("observed-true-processing-rate.lag-threshold")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDeprecatedKeys(
+ deprecatedOperatorConfigKey(
+
"observed-true-processing-rate.lag-threshold"))
+ .withDescription(
+ "Lag threshold for enabling observed true
processing rate measurements.");
+
+ public static final ConfigOption<Double>
OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD =
+ autoScalerConfig("observed-true-processing-rate.switch-threshold")
+ .doubleType()
+ .defaultValue(0.15)
+ .withDeprecatedKeys(
+ deprecatedOperatorConfigKey(
+
"observed-true-processing-rate.switch-threshold"))
+ .withDescription(
+ "Percentage threshold for switching to observed
from busy time based true processing rate if the measurement is off by at least
the configured fraction. For example 0.15 means we switch to observed if the
busy time based computation is at least 15% higher during catchup.");
+
+ public static final ConfigOption<Integer>
OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS =
+ autoScalerConfig("observed-true-processing-rate.min-observations")
+ .intType()
+ .defaultValue(2)
+ .withDeprecatedKeys(
+ deprecatedOperatorConfigKey(
+
"observed-true-processing-rate.min-observations"))
+ .withDescription(
+ "Minimum nr of observations used when estimating /
switching to observed true processing rate.");
+
public static final ConfigOption<Boolean>
SCALING_EFFECTIVENESS_DETECTION_ENABLED =
autoScalerConfig("scaling.effectiveness.detection.enabled")
.booleanType()
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index 9fafc686..a5a0edfe 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -25,7 +25,7 @@ import javax.annotation.Nullable;
import java.time.Duration;
/**
- * Handler all loggable events during scaling.
+ * Handler for autoscaler events.
*
* @param <KEY> The job key.
* @param <Context> Instance of JobAutoScalerContext.
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 7f28d947..398f2ee3 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -38,7 +38,8 @@ public enum FlinkMetric {
s -> s.startsWith("Source__") &&
s.endsWith(".numRecordsOutPerSecond")),
SOURCE_TASK_NUM_RECORDS_IN_PER_SEC(
s -> s.startsWith("Source__") &&
s.endsWith(".numRecordsInPerSecond")),
- PENDING_RECORDS(s -> s.endsWith(".pendingRecords"));
+ PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
+ BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond"));
public static final Map<FlinkMetric, AggregatedMetric> FINISHED_METRICS =
Map.of(
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index b6a3e17d..2e7e52ee 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -29,6 +29,9 @@ public enum ScalingMetric {
/** Processing rate at full capacity (records/sec). */
TRUE_PROCESSING_RATE(true),
+ /** Observed true processing rate for sources. */
+ OBSERVED_TPR(true),
+
/** Current processing rate. */
CURRENT_PROCESSING_RATE(true),
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index afaa21ae..ed82277b 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
/** Utilities for computing scaling metrics based on Flink metrics. */
public class ScalingMetrics {
@@ -52,11 +54,11 @@ public class ScalingMetrics {
Map<ScalingMetric, Double> scalingMetrics,
JobTopology topology,
double lagGrowthRate,
- Configuration conf) {
+ Configuration conf,
+ Supplier<Double> observedTprAvg) {
var isSource = topology.isSource(jobVertexID);
- double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics,
conf, jobVertexID);
double numRecordsInPerSecond =
getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
@@ -68,7 +70,15 @@ public class ScalingMetrics {
}
if (!Double.isNaN(numRecordsInPerSecond)) {
- double trueProcessingRate = computeTrueRate(numRecordsInPerSecond,
busyTimeMsPerSecond);
+ double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics,
conf, jobVertexID);
+ double trueProcessingRate =
+ computeTprFromBusyTime(conf, numRecordsInPerSecond,
busyTimeMsPerSecond);
+ if (isSource) {
+ var observedTprOpt =
+ getObservedTpr(flinkMetrics, scalingMetrics,
numRecordsInPerSecond, conf)
+ .orElseGet(observedTprAvg);
+ scalingMetrics.put(ScalingMetric.OBSERVED_TPR, observedTprOpt);
+ }
scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE,
trueProcessingRate);
scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE,
numRecordsInPerSecond);
} else {
@@ -78,6 +88,45 @@ public class ScalingMetrics {
}
}
+ private static Optional<Double> getObservedTpr(
+ Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+ Map<ScalingMetric, Double> scalingMetrics,
+ double numRecordsInPerSecond,
+ Configuration conf) {
+
+ // If there are no incoming records we return infinity to allow scale
down
+ if (numRecordsInPerSecond == 0) {
+ return Optional.of(Double.POSITIVE_INFINITY);
+ }
+
+ // We only measure observed tpr when we are catching up, that is when
the lag is beyond the
+ // configured observe threshold
+ boolean catchingUp =
+ scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.)
+ >=
conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)
+ .toSeconds()
+ * numRecordsInPerSecond;
+ if (!catchingUp) {
+ return Optional.empty();
+ }
+
+ double observedTpr =
+ computeObservedTprWithBackpressure(
+ numRecordsInPerSecond,
+
flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg());
+
+ return Double.isNaN(observedTpr) ? Optional.empty() :
Optional.of(observedTpr);
+ }
+
+ public static double computeObservedTprWithBackpressure(
+ double numRecordsInPerSecond, double backpressureMsPerSeconds) {
+ if (backpressureMsPerSeconds >= 1000) {
+ return Double.NaN;
+ }
+ double nonBackpressuredRate = (1 - (backpressureMsPerSeconds / 1000));
+ return numRecordsInPerSecond / nonBackpressuredRate;
+ }
+
public static Map<Edge, Double> computeOutputRatios(
Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
JobTopology topology) {
@@ -136,10 +185,9 @@ public class ScalingMetrics {
"No busyTimeMsPerSecond metric available for {}. No
scaling will be performed for this vertex.",
jobVertexId);
}
- // Pretend that the load is balanced because we don't know any
better
- busyTimeMsPerSecond =
conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
+ return Double.NaN;
}
- return busyTimeMsPerSecond;
+ return Math.max(0, busyTimeMsPerSecond);
}
private static double getNumRecordsInPerSecond(
@@ -159,7 +207,7 @@ public class ScalingMetrics {
LOG.warn("Received null input rate for {}. Returning NaN.",
jobVertexID);
return Double.NaN;
}
- return numRecordsInPerSecond.getSum();
+ return Math.max(0, numRecordsInPerSecond.getSum());
}
private static double getNumRecordsOutPerSecond(
@@ -225,12 +273,17 @@ public class ScalingMetrics {
return getNumRecordsOutPerSecond(fromMetrics, from);
}
- private static double computeTrueRate(double rate, double
busyTimeMsPerSecond) {
- if (rate <= 0 || busyTimeMsPerSecond <= 0) {
+ private static double computeTprFromBusyTime(
+ Configuration conf, double rate, double busyTimeMsPerSecond) {
+ if (rate == 0) {
// Nothing is coming in, we assume infinite processing power
// until we can sample the true processing rate (i.e. data flows).
return Double.POSITIVE_INFINITY;
}
+ // Pretend that the load is balanced because we don't know any better
+ if (Double.isNaN(busyTimeMsPerSecond)) {
+ busyTimeMsPerSecond =
conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
+ }
return rate / (busyTimeMsPerSecond / 1000);
}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index 36184dad..b4895648 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -23,7 +23,8 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
import java.util.Map;
/**
- * The Scaling Realizer is responsible for managing scaling actions.
+ * The Scaling Realizer is responsible for applying scaling actions, i.e.
actually rescaling the
+ * jobs.
*
* @param <KEY> The job key.
* @param <Context> Instance of JobAutoScalerContext.
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 4551ef54..b0255506 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -46,14 +46,13 @@ public interface AutoScalerStateStore<KEY, Context extends
JobAutoScalerContext<
void removeScalingHistory(Context jobContext) throws Exception;
- void storeEvaluatedMetrics(
- Context jobContext, SortedMap<Instant, CollectedMetrics>
evaluatedMetrics)
+ void storeCollectedMetrics(Context jobContext, SortedMap<Instant,
CollectedMetrics> metrics)
throws Exception;
- Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context
jobContext)
+ Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(Context
jobContext)
throws Exception;
- void removeEvaluatedMetrics(Context jobContext) throws Exception;
+ void removeCollectedMetrics(Context jobContext) throws Exception;
void storeParallelismOverrides(Context jobContext, Map<String, String>
parallelismOverrides)
throws Exception;
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 2685db91..1cb74d44 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -29,7 +29,7 @@ import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
/**
- * The state store based on the Java Heap, the state will be discarded after
process restarts.
+ * State store based on the Java Heap, the state will be discarded after
process restarts.
*
* @param <KEY> The job key.
* @param <Context> The job autoscaler context.
@@ -40,13 +40,13 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
private final Map<KEY, Map<JobVertexID, SortedMap<Instant,
ScalingSummary>>>
scalingHistoryStore;
- private final Map<KEY, SortedMap<Instant, CollectedMetrics>>
evaluatedMetricsStore;
+ private final Map<KEY, SortedMap<Instant, CollectedMetrics>>
collectedMetricsStore;
private final Map<KEY, Map<String, String>> parallelismOverridesStore;
public InMemoryAutoScalerStateStore() {
scalingHistoryStore = new ConcurrentHashMap<>();
- evaluatedMetricsStore = new ConcurrentHashMap<>();
+ collectedMetricsStore = new ConcurrentHashMap<>();
parallelismOverridesStore = new ConcurrentHashMap<>();
}
@@ -69,19 +69,19 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
}
@Override
- public void storeEvaluatedMetrics(
- Context jobContext, SortedMap<Instant, CollectedMetrics>
evaluatedMetrics) {
- evaluatedMetricsStore.put(jobContext.getJobKey(), evaluatedMetrics);
+ public void storeCollectedMetrics(
+ Context jobContext, SortedMap<Instant, CollectedMetrics> metrics) {
+ collectedMetricsStore.put(jobContext.getJobKey(), metrics);
}
@Override
- public Optional<SortedMap<Instant, CollectedMetrics>>
getEvaluatedMetrics(Context jobContext) {
- return
Optional.ofNullable(evaluatedMetricsStore.get(jobContext.getJobKey()));
+ public Optional<SortedMap<Instant, CollectedMetrics>>
getCollectedMetrics(Context jobContext) {
+ return
Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey()));
}
@Override
- public void removeEvaluatedMetrics(Context jobContext) {
- evaluatedMetricsStore.remove(jobContext.getJobKey());
+ public void removeCollectedMetrics(Context jobContext) {
+ collectedMetricsStore.remove(jobContext.getJobKey());
}
@Override
@@ -108,7 +108,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context
extends JobAutoScalerCont
@Override
public void removeInfoFromCache(KEY jobKey) {
scalingHistoryStore.remove(jobKey);
- evaluatedMetricsStore.remove(jobKey);
+ collectedMetricsStore.remove(jobKey);
parallelismOverridesStore.remove(jobKey);
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 0128f3ca..3e5c6fa1 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -368,7 +368,7 @@ public class BacklogBasedScalingTest {
"", Double.NaN, Double.NaN,
Double.NaN, 500.))));
autoscaler.scale(context);
- assertFalse(stateStore.getEvaluatedMetrics(context).get().isEmpty());
+ assertFalse(stateStore.getCollectedMetrics(context).get().isEmpty());
}
@Test
@@ -392,13 +392,13 @@ public class BacklogBasedScalingTest {
setClocksTo(now);
metricsCollector.setJobUpdateTs(now);
autoscaler.scale(context);
- assertTrue(stateStore.getEvaluatedMetrics(context).isEmpty());
+ assertTrue(autoscaler.lastEvaluatedMetrics.isEmpty());
assertTrue(eventCollector.events.isEmpty());
}
private void assertEvaluatedMetricsSize(int expectedSize) {
Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
- stateStore.getEvaluatedMetrics(context);
+ stateStore.getCollectedMetrics(context);
assertThat(evaluatedMetricsOpt).isPresent();
assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index c36d6e10..24cc9e2b 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -41,6 +42,7 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -121,7 +123,7 @@ public class MetricsCollectionAndEvaluationTest {
clock = Clock.offset(clock,
conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
metricsCollector.setClock(clock);
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
- assertEquals(1, collectedMetrics.getMetricHistory().size());
+ assertEquals(2, collectedMetrics.getMetricHistory().size());
assertFalse(collectedMetrics.isFullyCollected());
// We haven't collected a full window yet
@@ -129,7 +131,7 @@ public class MetricsCollectionAndEvaluationTest {
clock = Clock.offset(clock, Duration.ofSeconds(1));
metricsCollector.setClock(clock);
collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
- assertEquals(2, collectedMetrics.getMetricHistory().size());
+ assertEquals(3, collectedMetrics.getMetricHistory().size());
assertFalse(collectedMetrics.isFullyCollected());
// Advance time to stabilization period + full window => metrics
should be present
@@ -294,13 +296,13 @@ public class MetricsCollectionAndEvaluationTest {
// This call will lead to metric collection but we haven't reached the
window size yet
// which will hold back metrics
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
- assertEquals(1, metricsHistory.getMetricHistory().size());
+ assertEquals(3, metricsHistory.getMetricHistory().size());
assertFalse(metricsHistory.isFullyCollected());
// Collect more values in window
metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
metricsHistory = metricsCollector.updateMetrics(context, stateStore);
- assertEquals(2, metricsHistory.getMetricHistory().size());
+ assertEquals(4, metricsHistory.getMetricHistory().size());
assertFalse(metricsHistory.isFullyCollected());
// Window size reached
@@ -418,14 +420,177 @@ public class MetricsCollectionAndEvaluationTest {
0.,
ScalingMetric.TRUE_PROCESSING_RATE,
Double.POSITIVE_INFINITY,
+ ScalingMetric.OBSERVED_TPR,
+ Double.POSITIVE_INFINITY,
ScalingMetric.LOAD,
0.),
finishedMetrics);
}
+ @Test
+ public void testObservedTprCollection() throws Exception {
+ var source = new JobVertexID();
+ var topology = new JobTopology(new VertexInfo(source, Set.of(), 10,
720));
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> metrics =
+ Map.of(
+ source,
+ new HashMap<>(
+ Map.of(
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 1000000.),
+ FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
600., Double.NaN),
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, 200.,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
+
+ metricsCollector = new TestingMetricsCollector(topology);
+ metricsCollector.setJobUpdateTs(startTime);
+ metricsCollector.setCurrentMetrics(metrics);
+
+
context.getConfiguration().set(AutoScalerOptions.STABILIZATION_INTERVAL,
Duration.ZERO);
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(100),
ZoneId.systemDefault()));
+ var collectedMetrics =
+ metricsCollector
+ .updateMetrics(context, stateStore)
+ .getMetricHistory()
+ .get(Instant.ofEpochMilli(100))
+ .getVertexMetrics()
+ .get(source);
+
+ // Make sure both busy time and observed tpr is collected
+ assertEquals(2500.,
collectedMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
+ assertEquals(500. / 0.4,
collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+
+ // Make sure that average observed tpr is picked up only if 2 valid
observations
+ // We set no lag so observed cannot be computed and expect nan
+ metrics.get(source)
+ .put(
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 0.));
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(200),
ZoneId.systemDefault()));
+ collectedMetrics =
+ metricsCollector
+ .updateMetrics(context, stateStore)
+ .getMetricHistory()
+ .get(Instant.ofEpochMilli(200))
+ .getVertexMetrics()
+ .get(source);
+
+ // Make sure observed busy time is empty but still using observed
+ assertEquals(Double.NaN,
collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+
+ // Add another valid observed computation
+ metrics.get(source)
+ .put(
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 100000.));
+ metrics.get(source)
+ .put(
+ FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 500.,
Double.NaN));
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(300),
ZoneId.systemDefault()));
+ collectedMetrics =
+ metricsCollector
+ .updateMetrics(context, stateStore)
+ .getMetricHistory()
+ .get(Instant.ofEpochMilli(300))
+ .getVertexMetrics()
+ .get(source);
+
+ assertEquals(500. / 0.5,
collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+ // Make sure avg is picked correctly another valid observed computation
+ metrics.get(source)
+ .put(
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 0.));
+
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(400),
ZoneId.systemDefault()));
+ collectedMetrics =
+ metricsCollector
+ .updateMetrics(context, stateStore)
+ .getMetricHistory()
+ .get(Instant.ofEpochMilli(400))
+ .getVertexMetrics()
+ .get(source);
+
+ assertEquals(
+ ((500. / 0.5) + (500. / 0.4)) / 2,
+ collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+ }
+
+ @Test
+ public void testMetricCollectionDuringStabilization() throws Exception {
+ var source = new JobVertexID();
+ var topology = new JobTopology(new VertexInfo(source, Set.of(), 10,
720));
+ Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> metrics =
+ Map.of(
+ source,
+ new HashMap<>(
+ Map.of(
+ FlinkMetric.PENDING_RECORDS,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 1000000.),
+ FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
600., Double.NaN),
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, 200.,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
+
+ metricsCollector = new TestingMetricsCollector(topology);
+ metricsCollector.setJobUpdateTs(startTime);
+ metricsCollector.setCurrentMetrics(metrics);
+
+ context.getConfiguration()
+ .set(AutoScalerOptions.STABILIZATION_INTERVAL,
Duration.ofMillis(100));
+ context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW,
Duration.ofMillis(100));
+
+ // Within stabilization period we simply collect metrics but do not
return them
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50),
ZoneId.systemDefault()));
+ assertTrue(
+ metricsCollector.updateMetrics(context,
stateStore).getMetricHistory().isEmpty());
+ assertEquals(1, stateStore.getCollectedMetrics(context).get().size());
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60),
ZoneId.systemDefault()));
+ assertTrue(
+ metricsCollector.updateMetrics(context,
stateStore).getMetricHistory().isEmpty());
+ assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+
+ // Until window is full (time=200) we keep returning stabilizing
metrics
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150),
ZoneId.systemDefault()));
+ assertEquals(
+ 3, metricsCollector.updateMetrics(context,
stateStore).getMetricHistory().size());
+ assertEquals(3, stateStore.getCollectedMetrics(context).get().size());
+
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(180),
ZoneId.systemDefault()));
+ assertEquals(
+ 4, metricsCollector.updateMetrics(context,
stateStore).getMetricHistory().size());
+ assertEquals(4, stateStore.getCollectedMetrics(context).get().size());
+
+ // Once we reach full time we trim the stabilization metrics
+ metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(260),
ZoneId.systemDefault()));
+ assertEquals(
+ 2, metricsCollector.updateMetrics(context,
stateStore).getMetricHistory().size());
+ assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+ }
+
@Test
public void testScaleDownWithZeroProcessingRate() throws Exception {
- var topology = new JobTopology(new VertexInfo(source1, Set.of(), 10,
720));
+ var topology = new JobTopology(new VertexInfo(source1, Set.of(), 2,
720));
metricsCollector = new TestingMetricsCollector<>(topology);
metricsCollector.setJobUpdateTs(startTime);
@@ -448,7 +613,7 @@ public class MetricsCollectionAndEvaluationTest {
assertEquals(0,
evaluation.get(source1).get(ScalingMetric.TARGET_DATA_RATE).getCurrent());
assertEquals(
Double.POSITIVE_INFINITY,
-
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
+
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage());
assertEquals(
0.,
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
@@ -459,6 +624,23 @@ public class MetricsCollectionAndEvaluationTest {
scalingExecutor.scaleResource(context, evaluation);
var scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(1, scaledParallelism.get(source1));
+
+ // Make sure if there are measurements with non-infinite TPR, we don't
evaluate infinite
+ var lastCollected =
collectedMetrics.getMetricHistory().values().iterator().next();
+ var newMetrics = new HashMap<>(lastCollected.getVertexMetrics());
+ newMetrics.get(source1).put(ScalingMetric.TRUE_PROCESSING_RATE, 3.);
+ newMetrics.get(source1).put(ScalingMetric.OBSERVED_TPR, 3.);
+ newMetrics.get(source1).put(ScalingMetric.SOURCE_DATA_RATE, 2.);
+
+ collectedMetrics
+ .getMetricHistory()
+ .put(
+ Instant.ofEpochSecond(1234),
+ new CollectedMetrics(newMetrics,
lastCollected.getOutputRatios()));
+
+ evaluation = evaluator.evaluate(context.getConfiguration(),
collectedMetrics);
+ assertEquals(
+ 3.,
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage());
}
private CollectedMetricHistory collectMetrics() throws Exception {
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index da41c015..c4b714ef 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -205,7 +205,7 @@ public class RecommendedParallelismTest {
// after restart while the job is not running the evaluated metrics
are gone
autoscaler.scale(context);
- assertEquals(3, stateStore.getEvaluatedMetrics(context).get().size());
+ assertEquals(3, stateStore.getCollectedMetrics(context).get().size());
assertNull(autoscaler.lastEvaluatedMetrics.get(context.getJobKey()));
scaledParallelism =
ScalingExecutorTest.getScaledParallelism(stateStore, context);
assertEquals(4, scaledParallelism.get(source));
@@ -230,7 +230,7 @@ public class RecommendedParallelismTest {
private void assertEvaluatedMetricsSize(int expectedSize) {
Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
- stateStore.getEvaluatedMetrics(context);
+ stateStore.getCollectedMetrics(context);
assertThat(evaluatedMetricsOpt).isPresent();
assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 128c4094..a74ac295 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -46,17 +46,16 @@ public class RestApiMetricsCollectorTest {
@Test
public void testAggregateMultiplePendingRecordsMetricsPerSource() throws
Exception {
- RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
- new RestApiMetricsCollector<>();
+ var collector = new RestApiMetricsCollector<JobID,
JobAutoScalerContext<JobID>>();
JobVertexID jobVertexID = new JobVertexID();
- Map<String, FlinkMetric> flinkMetrics =
+ var flinkMetrics =
Map.of(
"a.pendingRecords", FlinkMetric.PENDING_RECORDS,
"b.pendingRecords", FlinkMetric.PENDING_RECORDS);
- Map<JobVertexID, Map<String, FlinkMetric>> metrics =
Map.of(jobVertexID, flinkMetrics);
+ var metrics = Map.of(jobVertexID, flinkMetrics);
- List<AggregatedMetric> aggregatedMetricsResponse =
+ var aggregatedMetricsResponse =
List.of(
new AggregatedMetric(
"a.pendingRecords", Double.NaN, Double.NaN,
Double.NaN, 100.),
@@ -65,8 +64,8 @@ public class RestApiMetricsCollectorTest {
new AggregatedMetric(
"c.unrelated", Double.NaN, Double.NaN,
Double.NaN, 100.));
- Configuration conf = new Configuration();
- RestClusterClient<String> restClusterClient =
+ var conf = new Configuration();
+ var restClusterClient =
new RestClusterClient<>(
conf,
"test-cluster",
@@ -91,7 +90,7 @@ public class RestApiMetricsCollectorTest {
};
JobID jobID = new JobID();
- JobAutoScalerContext<JobID> context =
+ var context =
new JobAutoScalerContext<>(
jobID,
jobID,
@@ -100,8 +99,7 @@ public class RestApiMetricsCollectorTest {
new UnregisteredMetricsGroup(),
() -> restClusterClient);
- Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap
=
- collector.queryAllAggregatedMetrics(context, metrics);
+ var jobVertexIDMapMap = collector.queryAllAggregatedMetrics(context,
metrics);
Assertions.assertEquals(1, jobVertexIDMapMap.size());
Map<FlinkMetric, AggregatedMetric> vertexMetrics =
jobVertexIDMapMap.get(jobVertexID);
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index 57fe3de9..724aa218 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -36,11 +36,10 @@ import org.junit.jupiter.api.Test;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -90,30 +89,66 @@ public class ScalingMetricCollectorTest {
@Test
public void testQueryNamesOnTopologyChange() {
- var metricNameQueryCounter = new AtomicInteger(0);
- RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
- new RestApiMetricsCollector<>() {
+ var metricNameQueryCounter = new HashMap<JobVertexID, Integer>();
+ var collector =
+ new RestApiMetricsCollector<JobID,
JobAutoScalerContext<JobID>>() {
@Override
protected Map<String, FlinkMetric>
getFilteredVertexMetricNames(
RestClusterClient<?> rc, JobID id, JobVertexID
jvi, JobTopology t) {
- metricNameQueryCounter.incrementAndGet();
+ metricNameQueryCounter.compute(jvi, (j, c) -> c + 1);
return Map.of();
}
};
- var t1 = new JobTopology(new VertexInfo(new JobVertexID(),
Collections.emptySet(), 1, 1));
- var t2 = new JobTopology(new VertexInfo(new JobVertexID(),
Collections.emptySet(), 1, 1));
+ var source = new JobVertexID();
+ var source2 = new JobVertexID();
+ var sink = new JobVertexID();
+ metricNameQueryCounter.put(source, 0);
+ metricNameQueryCounter.put(source2, 0);
+ metricNameQueryCounter.put(sink, 0);
+
+ var t1 =
+ new JobTopology(
+ new VertexInfo(source, Set.of(), 1, 1),
+ new VertexInfo(sink, Set.of(source), 1, 1));
+
+ var t2 =
+ new JobTopology(
+ new VertexInfo(source2, Set.of(), 1, 1),
+ new VertexInfo(sink, Set.of(source2), 1, 1));
collector.queryFilteredMetricNames(context, t1);
- assertEquals(1, metricNameQueryCounter.get());
+ assertEquals(1, metricNameQueryCounter.get(source));
+ assertEquals(0, metricNameQueryCounter.get(source2));
+ assertEquals(1, metricNameQueryCounter.get(sink));
+
collector.queryFilteredMetricNames(context, t1);
collector.queryFilteredMetricNames(context, t1);
- assertEquals(1, metricNameQueryCounter.get());
+ // Make sure source metrics are refreshed
+ assertEquals(3, metricNameQueryCounter.get(source));
+ assertEquals(0, metricNameQueryCounter.get(source2));
+ assertEquals(1, metricNameQueryCounter.get(sink));
+
+ // Topology change
collector.queryFilteredMetricNames(context, t2);
- assertEquals(2, metricNameQueryCounter.get());
+ assertEquals(3, metricNameQueryCounter.get(source));
+ assertEquals(1, metricNameQueryCounter.get(source2));
+ assertEquals(2, metricNameQueryCounter.get(sink));
+
collector.queryFilteredMetricNames(context, t2);
+ assertEquals(3, metricNameQueryCounter.get(source));
+ assertEquals(2, metricNameQueryCounter.get(source2));
+ assertEquals(2, metricNameQueryCounter.get(sink));
+
+ // Mark source finished, should not be queried again
+ t2 =
+ new JobTopology(
+ new VertexInfo(source2, Set.of(), 1, 1, true),
+ new VertexInfo(sink, Set.of(source2), 1, 1));
collector.queryFilteredMetricNames(context, t2);
- assertEquals(2, metricNameQueryCounter.get());
+ assertEquals(3, metricNameQueryCounter.get(source));
+ assertEquals(2, metricNameQueryCounter.get(source2));
+ assertEquals(2, metricNameQueryCounter.get(sink));
}
@Test
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 13b19445..ce7c5cb5 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -316,6 +316,125 @@ public class ScalingMetricEvaluatorTest {
assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology,
metricHistory, conf));
}
+ @Test
+ public void testObservedTprEvaluation() {
+ var source = new JobVertexID();
+ var conf = new Configuration();
+
+ var topology = new JobTopology(new VertexInfo(source,
Collections.emptySet(), 1, 1));
+ var evaluator = new ScalingMetricEvaluator();
+
+ var metricHistory = new TreeMap<Instant, CollectedMetrics>();
+ metricHistory.put(
+ Instant.ofEpochMilli(100),
+ new CollectedMetrics(
+ Map.of(
+ source,
+ Map.of(
+ ScalingMetric.LAG,
+ 0.,
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 300.,
+ ScalingMetric.OBSERVED_TPR,
+ 200.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 100.,
+ ScalingMetric.SOURCE_DATA_RATE,
+ 50.,
+ ScalingMetric.LOAD,
+ 10.)),
+ Map.of()));
+ metricHistory.put(
+ Instant.ofEpochMilli(200),
+ new CollectedMetrics(
+ Map.of(
+ source,
+ Map.of(
+ ScalingMetric.LAG,
+ 0.,
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 400.,
+ ScalingMetric.OBSERVED_TPR,
+ 400.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 100.,
+ ScalingMetric.SOURCE_DATA_RATE,
+ 50.,
+ ScalingMetric.LOAD,
+ 10.)),
+ Map.of()));
+
+ // Observed TPR average : 300
+ // Bust Time TPR average: 350
+
+ // Set diff threshold to 20% -> within threshold
+
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, 0.2);
+
+ // Test that we used busy time based TPR
+ assertEquals(
+ new EvaluatedScalingMetric(400., 350.),
+ evaluator
+ .evaluate(conf, new CollectedMetricHistory(topology,
metricHistory))
+ .get(source)
+ .get(ScalingMetric.TRUE_PROCESSING_RATE));
+
+ // Set diff threshold to 10% -> outside threshold
+
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, 0.1);
+
+ // Test that we used the observed TPR
+ assertEquals(
+ new EvaluatedScalingMetric(400, 300.),
+ evaluator
+ .evaluate(conf, new CollectedMetricHistory(topology,
metricHistory))
+ .get(source)
+ .get(ScalingMetric.TRUE_PROCESSING_RATE));
+
+ // Test that observed tpr min observations are respected. If less, use
busy time
+
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS, 3);
+ assertEquals(
+ new EvaluatedScalingMetric(400., 350.),
+ evaluator
+ .evaluate(conf, new CollectedMetricHistory(topology,
metricHistory))
+ .get(source)
+ .get(ScalingMetric.TRUE_PROCESSING_RATE));
+ }
+
+ @Test
+ public void testMissingObservedTpr() {
+ var source = new JobVertexID();
+ var conf = new Configuration();
+
+ var topology = new JobTopology(new VertexInfo(source,
Collections.emptySet(), 1, 1));
+ var evaluator = new ScalingMetricEvaluator();
+
+ var metricHistory = new TreeMap<Instant, CollectedMetrics>();
+ metricHistory.put(
+ Instant.ofEpochMilli(100),
+ new CollectedMetrics(
+ Map.of(
+ source,
+ Map.of(
+ ScalingMetric.LAG,
+ 0.,
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 300.,
+ ScalingMetric.CURRENT_PROCESSING_RATE,
+ 100.,
+ ScalingMetric.SOURCE_DATA_RATE,
+ 50.,
+ ScalingMetric.LOAD,
+ 10.)),
+ Map.of()));
+
+ // Test that we used busy time based TPR
+ assertEquals(
+ new EvaluatedScalingMetric(300., 300.),
+ evaluator
+ .evaluate(conf, new CollectedMetricHistory(topology,
metricHistory))
+ .get(source)
+ .get(ScalingMetric.TRUE_PROCESSING_RATE));
+ }
+
private Tuple2<Double, Double> getThresholds(
double inputTargetRate, double catchUpRate, Configuration conf) {
return getThresholds(inputTargetRate, catchUpRate, conf, false);
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 85a72e5a..697fc926 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -38,6 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for scaling metrics computation logic. */
public class ScalingMetricsTest {
+ private static final double PREV_TPR = 123;
+ private static final JobVertexID SOURCE = new JobVertexID();
+
@Test
public void testProcessingAndOutputMetrics() {
var source = new JobVertexID();
@@ -55,7 +58,7 @@ public class ScalingMetricsTest {
source,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
+ new AggregatedMetric("", Double.NaN, 900., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
aggSum(1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -63,12 +66,15 @@ public class ScalingMetricsTest {
scalingMetrics,
topology,
15.,
- new Configuration());
+ new Configuration(),
+ () -> PREV_TPR);
assertEquals(
Map.of(
ScalingMetric.TRUE_PROCESSING_RATE,
- 10000.,
+ 1000. / 0.9,
+ ScalingMetric.OBSERVED_TPR,
+ PREV_TPR,
ScalingMetric.SOURCE_DATA_RATE,
1015.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -89,12 +95,15 @@ public class ScalingMetricsTest {
scalingMetrics,
topology,
-50.,
- new Configuration());
+ new Configuration(),
+ () -> PREV_TPR);
assertEquals(
Map.of(
ScalingMetric.TRUE_PROCESSING_RATE,
10000.,
+ ScalingMetric.OBSERVED_TPR,
+ PREV_TPR,
ScalingMetric.SOURCE_DATA_RATE,
950.,
ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -114,7 +123,8 @@ public class ScalingMetricsTest {
scalingMetrics,
topology,
0.,
- new Configuration());
+ new Configuration(),
+ () -> 0.);
assertEquals(
Map.of(
@@ -140,7 +150,8 @@ public class ScalingMetricsTest {
scalingMetrics,
topology,
0.,
- conf);
+ conf,
+ () -> 0.);
assertEquals(
Map.of(
@@ -179,7 +190,8 @@ public class ScalingMetricsTest {
scalingMetrics,
topology,
0.,
- conf);
+ conf,
+ () -> PREV_TPR);
// Make sure vertex won't be scaled
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(source.toHexString()));
@@ -239,8 +251,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// When not busy at all, we have infinite processing
power
Double.POSITIVE_INFINITY,
- ScalingMetric.SOURCE_DATA_RATE,
- dataRate,
ScalingMetric.CURRENT_PROCESSING_RATE,
10.),
scalingMetrics);
@@ -256,8 +266,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// When no records are coming in, we assume infinite
processing power
Double.POSITIVE_INFINITY,
- ScalingMetric.SOURCE_DATA_RATE,
- 0.,
ScalingMetric.CURRENT_PROCESSING_RATE,
0.),
scalingMetrics);
@@ -272,8 +280,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// When no records are coming in, we assume infinite
processing power
Double.POSITIVE_INFINITY,
- ScalingMetric.SOURCE_DATA_RATE,
- 0.,
ScalingMetric.CURRENT_PROCESSING_RATE,
0.),
scalingMetrics);
@@ -287,8 +293,6 @@ public class ScalingMetricsTest {
ScalingMetric.TRUE_PROCESSING_RATE,
// Nothing is coming in, we must assume infinite
processing power
Double.POSITIVE_INFINITY,
- ScalingMetric.SOURCE_DATA_RATE,
- 0.,
ScalingMetric.CURRENT_PROCESSING_RATE,
0.),
scalingMetrics);
@@ -308,11 +312,11 @@ public class ScalingMetricsTest {
Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
ScalingMetrics.computeDataRateMetrics(
- source,
+ op,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
new AggregatedMetric("", Double.NaN, busyness,
Double.NaN, Double.NaN),
- FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN, Double.NaN,
processingRate),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -320,7 +324,8 @@ public class ScalingMetricsTest {
scalingMetrics,
topology,
0.,
- new Configuration());
+ new Configuration(),
+ () -> 0.);
return scalingMetrics;
}
@@ -407,6 +412,74 @@ public class ScalingMetricsTest {
ScalingMetrics.computeOutputRatios(allMetrics, topology));
}
+ @Test
+ public void testComputeTprWithBackpressure() {
+ assertEquals(Double.NaN,
ScalingMetrics.computeObservedTprWithBackpressure(100, 1000));
+ assertEquals(500,
ScalingMetrics.computeObservedTprWithBackpressure(500., 0));
+ assertEquals(1000,
ScalingMetrics.computeObservedTprWithBackpressure(250, 750));
+ }
+
+ @Test
+ public void computeObservedTpr() {
+ // Without lag we cannot compute observed tpr, we compare against old
+ assertEquals(PREV_TPR, computeObservedTpr(500, 1000, 500, 500));
+
+ assertEquals(PREV_TPR, computeObservedTpr(0, 1000, 500, 500));
+
+ // When there is enough lag, observed rate is computed. Switch to
busyness because diff is
+ // within limit
+ assertEquals(900 / 0.9, computeObservedTpr(10000000, 900, 850, 100));
+
+ // Should stay with busyness after switching as diff is still small
+ assertEquals(900 / 0.91, computeObservedTpr(10000000, 900, 900, 90));
+
+ // Use observed when diff is large and switch to observed
+ assertEquals(1000 / 0.8, computeObservedTpr(10000000, 1000, 500, 200));
+ assertEquals(1000 / 0.81, computeObservedTpr(10000000, 1000, 500,
190));
+
+ // When no incoming records observed TPR should be infinity
+ assertEquals(Double.POSITIVE_INFINITY, computeObservedTpr(500, 0, 100,
100));
+ }
+
+ public static double computeObservedTpr(
+ double lag, double processingRate, double busyness, double
backpressure) {
+ return computeObservedTpr(lag, processingRate, busyness, backpressure,
new Configuration());
+ }
+
+ public static double computeObservedTpr(
+ double lag,
+ double processingRate,
+ double busyness,
+ double backpressure,
+ Configuration conf) {
+ var sink = new JobVertexID();
+ var topology =
+ new JobTopology(
+ new VertexInfo(SOURCE, Collections.emptySet(), 1, 1),
+ new VertexInfo(sink, Set.of(SOURCE), 1, 1));
+
+ Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+ scalingMetrics.put(ScalingMetric.LAG, lag);
+ ScalingMetrics.computeDataRateMetrics(
+ SOURCE,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, busyness,
Double.NaN, Double.NaN),
+ FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
backpressure, Double.NaN),
+ FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN, Double.NaN,
processingRate),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ aggSum(0)),
+ scalingMetrics,
+ topology,
+ 0.,
+ conf,
+ () -> PREV_TPR);
+ return scalingMetrics.get(ScalingMetric.OBSERVED_TPR);
+ }
+
private static AggregatedMetric aggSum(double sum) {
return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN,
sum);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
index f9816533..97e70267 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
@@ -33,10 +33,8 @@ public class AutoscalerFactory {
public static JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext>
create(
KubernetesClient client, EventRecorder eventRecorder) {
- KubernetesAutoScalerStateStore stateStore =
- new KubernetesAutoScalerStateStore(new ConfigMapStore(client));
- KubernetesAutoScalerEventHandler eventHandler =
- new KubernetesAutoScalerEventHandler(eventRecorder);
+ var stateStore = new KubernetesAutoScalerStateStore(new
ConfigMapStore(client));
+ var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder);
return new JobAutoScalerImpl<>(
new RestApiMetricsCollector<>(),
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
index 2a60d4af..5ded9719 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
@@ -44,12 +44,11 @@ public class ConfigMapStore {
private final KubernetesClient kubernetesClient;
- // The cache for each resourceId may be in three situations:
- // 1. The resourceId isn't exist : ConfigMap isn't loaded from kubernetes,
or it's removed.
- // 2. The resourceId is exist, and value is the Optional.empty() : We have
loaded the ConfigMap
- // from kubernetes, but the ConfigMap isn't created at kubernetes side.
- // 3. The resourceId is exist, and the Optional isn't empty : We have
loaded the ConfigMap from
- // kubernetes, it may be not same with kubernetes side due to it's not
flushed after updating.
+ // The cache for each resourceId may be in three states:
+ // 1. The resourceId doesn't exist : ConfigMap isn't loaded from
kubernetes, or it's deleted
+ // 2 Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
+ // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it
may not be the same
+ // if not flushed already
private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
new ConcurrentHashMap<>();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
index 2b446cbe..5f45f368 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
@@ -108,15 +108,15 @@ public class KubernetesAutoScalerStateStore
}
@Override
- public void storeEvaluatedMetrics(
+ public void storeCollectedMetrics(
KubernetesJobAutoScalerContext jobContext,
- SortedMap<Instant, CollectedMetrics> evaluatedMetrics) {
+ SortedMap<Instant, CollectedMetrics> metrics) {
configMapStore.putSerializedState(
- jobContext, COLLECTED_METRICS_KEY,
serializeEvaluatedMetrics(evaluatedMetrics));
+ jobContext, COLLECTED_METRICS_KEY,
serializeEvaluatedMetrics(metrics));
}
@Override
- public Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(
+ public Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(
KubernetesJobAutoScalerContext jobContext) {
Optional<String> serializedEvaluatedMetricsOpt =
configMapStore.getSerializedState(jobContext,
COLLECTED_METRICS_KEY);
@@ -135,7 +135,7 @@ public class KubernetesAutoScalerStateStore
}
@Override
- public void removeEvaluatedMetrics(KubernetesJobAutoScalerContext
jobContext) {
+ public void removeCollectedMetrics(KubernetesJobAutoScalerContext
jobContext) {
configMapStore.removeSerializedState(jobContext,
COLLECTED_METRICS_KEY);
}
@@ -217,7 +217,7 @@ public class KubernetesAutoScalerStateStore
.orElse(0);
Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
- getEvaluatedMetrics(context);
+ getCollectedMetrics(context);
if (evaluatedMetricsOpt.isEmpty()) {
return;
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
index 4618a498..21f0c7c3 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
@@ -218,17 +218,17 @@ public class KubernetesAutoScalerStateStoreTest {
var now = Instant.now();
Assertions.assertEquals(scalingHistory,
getTrimmedScalingHistory(stateStore, ctx, now));
-
assertThat(stateStore.getEvaluatedMetrics(ctx)).hasValue(metricHistory);
+
assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory);
// Override with compressed data
var newTs = Instant.now();
addToScalingHistoryAndStore(stateStore, ctx, newTs, Map.of());
- stateStore.storeEvaluatedMetrics(ctx, metricHistory);
+ stateStore.storeCollectedMetrics(ctx, metricHistory);
// Make sure we can still access everything
Assertions.assertEquals(scalingHistory,
getTrimmedScalingHistory(stateStore, ctx, newTs));
-
assertThat(stateStore.getEvaluatedMetrics(ctx)).hasValue(metricHistory);
+
assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory);
}
@Test
@@ -265,7 +265,7 @@ public class KubernetesAutoScalerStateStoreTest {
new ScalingSummary(
1, 2, Map.of(ScalingMetric.LAG,
EvaluatedScalingMetric.of(2.)))));
- stateStore.storeEvaluatedMetrics(ctx, metricHistory);
+ stateStore.storeCollectedMetrics(ctx, metricHistory);
assertFalse(
configMapStore
@@ -314,7 +314,7 @@ public class KubernetesAutoScalerStateStoreTest {
configMapStore.getSerializedState(
ctx,
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))
.isPresent();
- assertThat(stateStore.getEvaluatedMetrics(ctx)).isEmpty();
+ assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
assertThat(
configMapStore.getSerializedState(
ctx,
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))