This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 20cf0ee4 [FLINK-30920] Prevent clearing any manually defined vertex
exclusions (#569)
20cf0ee4 is described below
commit 20cf0ee471081a4f7ec8bc812f3b4bbdd125938b
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Apr 19 16:49:49 2023 +0200
[FLINK-30920] Prevent clearing any manually defined vertex exclusions (#569)
---
.../kubernetes/operator/autoscaler/metrics/ScalingMetrics.java | 9 +--------
.../operator/autoscaler/metrics/ScalingMetricsTest.java | 5 +++++
2 files changed, 6 insertions(+), 8 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 8876ee14..54fe7bb6 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -128,8 +128,6 @@ public class ScalingMetrics {
excludeVertexFromScaling(conf, jobVertexId);
// Pretend that the load is balanced because we don't know any
better
busyTimeMsPerSecond =
conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
- } else {
- includeVertexForScaling(conf, jobVertexId);
}
return busyTimeMsPerSecond;
}
@@ -197,18 +195,13 @@ public class ScalingMetrics {
return rate / (busyTimeMsPerSecond / 1000);
}
+ /** Temporarily exclude vertex from scaling for this run. This does not
update the spec. */
private static void excludeVertexFromScaling(Configuration conf,
JobVertexID jobVertexId) {
Set<String> excludedIds = new
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
excludedIds.add(jobVertexId.toHexString());
conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new
ArrayList<>(excludedIds));
}
- private static void includeVertexForScaling(Configuration conf,
JobVertexID jobVertexId) {
- Set<String> excludedIds = new
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
- excludedIds.remove(jobVertexId.toHexString());
- conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new
ArrayList<>(excludedIds));
- }
-
public static double roundMetric(double value) {
double rounded = Precision.round(value, 3);
// Never round down to 0, return original value instead
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 2341601e..c9aa95b2 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -177,6 +178,8 @@ public class ScalingMetricsTest {
new VertexInfo(sink, Collections.singleton(source),
10, 100));
Configuration conf = new Configuration();
+ assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).isEmpty());
+ conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS,
List.of(sink.toHexString()));
Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
ScalingMetrics.computeDataRateMetrics(
@@ -196,6 +199,8 @@ public class ScalingMetricsTest {
// Make sure vertex won't be scaled
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(source.toHexString()));
+ // Existing overrides should be preserved
+
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(sink.toHexString()));
// Legacy source rates are computed based on the current rate and a
balanced utilization
assertEquals(
2000 / conf.get(AutoScalerOptions.TARGET_UTILIZATION),