This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 20cf0ee4 [FLINK-30920] Prevent clearing any manually defined vertex 
exclusions (#569)
20cf0ee4 is described below

commit 20cf0ee471081a4f7ec8bc812f3b4bbdd125938b
Author: Maximilian Michels <[email protected]>
AuthorDate: Wed Apr 19 16:49:49 2023 +0200

    [FLINK-30920] Prevent clearing any manually defined vertex exclusions (#569)
---
 .../kubernetes/operator/autoscaler/metrics/ScalingMetrics.java   | 9 +--------
 .../operator/autoscaler/metrics/ScalingMetricsTest.java          | 5 +++++
 2 files changed, 6 insertions(+), 8 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 8876ee14..54fe7bb6 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -128,8 +128,6 @@ public class ScalingMetrics {
             excludeVertexFromScaling(conf, jobVertexId);
             // Pretend that the load is balanced because we don't know any 
better
             busyTimeMsPerSecond = 
conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
-        } else {
-            includeVertexForScaling(conf, jobVertexId);
         }
         return busyTimeMsPerSecond;
     }
@@ -197,18 +195,13 @@ public class ScalingMetrics {
         return rate / (busyTimeMsPerSecond / 1000);
     }
 
+    /** Temporarily exclude vertex from scaling for this run. This does not 
update the spec. */
     private static void excludeVertexFromScaling(Configuration conf, 
JobVertexID jobVertexId) {
         Set<String> excludedIds = new 
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
         excludedIds.add(jobVertexId.toHexString());
         conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
     }
 
-    private static void includeVertexForScaling(Configuration conf, 
JobVertexID jobVertexId) {
-        Set<String> excludedIds = new 
HashSet<>(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS));
-        excludedIds.remove(jobVertexId.toHexString());
-        conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, new 
ArrayList<>(excludedIds));
-    }
-
     public static double roundMetric(double value) {
         double rounded = Precision.round(value, 3);
         // Never round down to 0, return original value instead
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 2341601e..c9aa95b2 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -177,6 +178,8 @@ public class ScalingMetricsTest {
                         new VertexInfo(sink, Collections.singleton(source), 
10, 100));
 
         Configuration conf = new Configuration();
+        assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).isEmpty());
+        conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, 
List.of(sink.toHexString()));
 
         Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
         ScalingMetrics.computeDataRateMetrics(
@@ -196,6 +199,8 @@ public class ScalingMetricsTest {
 
         // Make sure vertex won't be scaled
         
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(source.toHexString()));
+        // Existing overrides should be preserved
+        
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(sink.toHexString()));
         // Legacy source rates are computed based on the current rate and a 
balanced utilization
         assertEquals(
                 2000 / conf.get(AutoScalerOptions.TARGET_UTILIZATION),

Reply via email to