mxm commented on code in PR #533:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1114357150
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java:
##########
@@ -153,4 +154,12 @@ private static ConfigOptions.OptionBuilder
autoScalerConfig(String key) {
.defaultValue(MetricAggregator.MAX)
.withDescription(
"Metric aggregator to use for busyTime metrics.
This affects how true processing/output rate will be computed. Using max allows
us to handle jobs with data skew more robustly, while avg may provide better
stability when we know that the load distribution is even.");
+
+ public static final ConfigOption<List<String>> EXCLUDE_VERTEX_IDS =
+ autoScalerConfig("exclude.vertex.ids")
Review Comment:
Maybe keep this under `vertex`?
```suggestion
autoScalerConfig("vertex.exclude.ids")
```
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -66,12 +66,15 @@ public Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>> evaluate(
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>();
var metricsHistory = collectedMetrics.getMetricHistory();
var topology = collectedMetrics.getJobTopology();
+ var excludeVertexIdList =
conf.get(AutoScalerOptions.EXCLUDE_VERTEX_IDS);
for (var vertex : topology.getVerticesInTopologicalOrder()) {
- scalingOutput.put(
- vertex,
- computeVertexScalingSummary(
- conf, scalingOutput, metricsHistory, topology,
vertex));
+ if (!excludeVertexIdList.contains(vertex.toHexString())) {
+ scalingOutput.put(
Review Comment:
I think this will break the algorithm unless the vertex is a sink. The
autoscaler relies on traversing the DAG to compute the desired processing
rates. If we include a non-sink vertex, none of the downstream vertices will be
able to be scaled.
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -66,12 +66,15 @@ public Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>> evaluate(
var scalingOutput = new HashMap<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>();
var metricsHistory = collectedMetrics.getMetricHistory();
var topology = collectedMetrics.getJobTopology();
+ var excludeVertexIdList =
conf.get(AutoScalerOptions.EXCLUDE_VERTEX_IDS);
for (var vertex : topology.getVerticesInTopologicalOrder()) {
- scalingOutput.put(
- vertex,
- computeVertexScalingSummary(
- conf, scalingOutput, metricsHistory, topology,
vertex));
+ if (!excludeVertexIdList.contains(vertex.toHexString())) {
+ scalingOutput.put(
Review Comment:
I think you should add this logic in the ScalingExecutor which will allow to
ignore a vertex while still scaling all downstream vertices.
##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:
##########
@@ -212,6 +214,63 @@ public void testUtilizationBoundaryComputation() {
assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
}
+ @Test
+ public void testVertexesExclusionForEvaluation() {
+ var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+ var source = JobVertexID.fromHexString(sourceHexString);
+ var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+ var sink = JobVertexID.fromHexString(sinkHexString);
+
+ var topology =
+ new JobTopology(
+ new VertexInfo(source, Collections.emptySet(), 1, 1),
+ new VertexInfo(sink, Set.of(source), 1, 1));
+
+ var evaluator = new ScalingMetricEvaluator();
+
+ var metricHistory = new TreeMap<Instant, Map<JobVertexID,
Map<ScalingMetric, Double>>>();
+
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ SOURCE_DATA_RATE,
+ 100.,
+ LAG,
+ 0.,
+ OUTPUT_RATIO,
+ 2.,
+ TRUE_OUTPUT_RATE,
+ 200.,
+ TRUE_PROCESSING_RATE,
+ 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ metricHistory.put(
+ Instant.now(),
+ Map.of(
+ source,
+ Map.of(
+ SOURCE_DATA_RATE, 200.,
+ LAG, 1000.,
+ OUTPUT_RATIO, 2.,
+ TRUE_OUTPUT_RATE, 200.,
+ TRUE_PROCESSING_RATE, 200.),
+ sink,
+ Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+ var conf = new Configuration();
+
+ conf.set(AutoScalerOptions.EXCLUDE_VERTEX_IDS, List.of(sinkHexString));
+ var evaluatedMetrics =
+ evaluator.evaluate(conf, new CollectedMetrics(topology,
metricHistory));
Review Comment:
I think we should also test the generation of scaling decisions via the
ScalingExecutor. This feature must also work for non-sink vertices. We can
modify this test to use a non-sink.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]