mxm commented on code in PR #533:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1117086633
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##########
@@ -222,19 +223,30 @@ private Map<JobVertexID, ScalingSummary>
computeScalingSummary(
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory) {
var out = new HashMap<JobVertexID, ScalingSummary>();
+ var excludeVertexIdList =
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
evaluatedMetrics.forEach(
(v, metrics) -> {
- var currentParallelism =
- (int)
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
- var newParallelism =
- jobVertexScaler.computeScaleTargetParallelism(
- resource,
- conf,
+ if (excludeVertexIdList.contains(v.toHexString())) {
+ LOG.debug(
+ "Vertex {} is part of `vertex.exclude.ids`
config, Ignoring it for scaling",
+ v);
Review Comment:
Can we make this info level?
##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java:
##########
@@ -183,6 +184,29 @@ op1, evaluated(1, 70, 100),
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated,
scalingSummary));
}
+ @Test
+ public void testVertexesExclusionForScaling() {
+ var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+ var source = JobVertexID.fromHexString(sourceHexString);
+ var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
+ var filterOperator =
JobVertexID.fromHexString(filterOperatorHexString);
+ var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+ var sink = JobVertexID.fromHexString(sinkHexString);
+
+ var scalingInfo = new AutoScalerInfo(new HashMap<>());
+ var metrics =
+ Map.of(
+ source,
+ evaluated(1, 70, 100),
+ filterOperator,
+ evaluated(1, 100, 80),
+ sink,
+ evaluated(1, 70, 80));
+ // filter operator should not scale
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS,
List.of(filterOperatorHexString));
+ assertFalse(scalingDecisionExecutor.scaleResource(flinkDep,
scalingInfo, conf, metrics));
Review Comment:
Can we test that it would have scaled if the configuration was not set?
##########
flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java:
##########
@@ -222,19 +223,30 @@ private Map<JobVertexID, ScalingSummary>
computeScalingSummary(
Map<JobVertexID, SortedMap<Instant, ScalingSummary>>
scalingHistory) {
var out = new HashMap<JobVertexID, ScalingSummary>();
+ var excludeVertexIdList =
conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
evaluatedMetrics.forEach(
(v, metrics) -> {
- var currentParallelism =
- (int)
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
- var newParallelism =
- jobVertexScaler.computeScaleTargetParallelism(
- resource,
- conf,
+ if (excludeVertexIdList.contains(v.toHexString())) {
+ LOG.debug(
+ "Vertex {} is part of `vertex.exclude.ids`
config, Ignoring it for scaling",
+ v);
Review Comment:
This still requires more work to ensure the feature works for non-sink
vertices. If you ignore a non-sink vertex, the downstream operators from it
might be scaled based on the target rates calculated for the ignored vertex. We
need to make sure to set the target rates based on the current rate for the
ingored vertex. Here:
https://github.com/apache/flink-kubernetes-operator/blob/c6cd7cbb83c5257960daebe65214035a6732ad0d/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java#L91
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]