This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 7d90ce87 [FLINK-35896][autoscaler] Handle exception events in 
RescaleApiScalingRealizer (#856)
7d90ce87 is described below

commit 7d90ce8716153e46608831a2aa5057a842262fdc
Author: big face cat <[email protected]>
AuthorDate: Tue Jul 30 16:33:05 2024 +0800

    [FLINK-35896][autoscaler] Handle exception events in 
RescaleApiScalingRealizer (#856)
---
 .../realizer/RescaleApiScalingRealizer.java        |  4 +--
 .../realizer/RescaleApiScalingRealizerTest.java    |  6 ++--
 .../flink/autoscaler/realizer/ScalingRealizer.java | 17 +++++++---
 .../flink/autoscaler/JobAutoScalerImplTest.java    | 38 ++++++++++++++++++++++
 4 files changed, 55 insertions(+), 10 deletions(-)

diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
index abf0f95b..f575e8b1 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -66,7 +66,7 @@ public class RescaleApiScalingRealizer<KEY, Context extends 
JobAutoScalerContext
 
     @Override
     public void realizeParallelismOverrides(
-            Context context, Map<String, String> parallelismOverrides) {
+            Context context, Map<String, String> parallelismOverrides) throws 
Exception {
         Configuration conf = context.getConfiguration();
         if (!conf.get(JobManagerOptions.SCHEDULER)
                 .equals(JobManagerOptions.SchedulerType.Adaptive)) {
@@ -121,8 +121,6 @@ public class RescaleApiScalingRealizer<KEY, Context extends 
JobAutoScalerContext
             } else {
                 LOG.info("Vertex resources requirements already match target, 
nothing to do...");
             }
-        } catch (Exception e) {
-            LOG.warn("Failed to apply parallelism overrides.", e);
         }
     }
 
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
index b16cbdac..f8dc839b 100644
--- 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -59,7 +59,7 @@ class RescaleApiScalingRealizerTest {
      */
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    void testUpdateResourceRequirements(boolean resourceIsChanged) {
+    void testUpdateResourceRequirements(boolean resourceIsChanged) throws 
Exception {
         var jobID = new JobID();
         var jobVertex1 = new JobVertexID().toHexString();
         var jobVertex2 = new JobVertexID().toHexString();
@@ -110,7 +110,7 @@ class RescaleApiScalingRealizerTest {
     }
 
     @Test
-    void testDisableAdaptiveScheduler() {
+    void testDisableAdaptiveScheduler() throws Exception {
         var jobID = new JobID();
         var jobVertex1 = new JobVertexID().toHexString();
         var jobVertex2 = new JobVertexID().toHexString();
@@ -132,7 +132,7 @@ class RescaleApiScalingRealizerTest {
     }
 
     @Test
-    void testJobNotRunning() {
+    void testJobNotRunning() throws Exception {
         var jobID = new JobID();
         var jobVertex1 = new JobVertexID().toHexString();
         var jobVertex2 = new JobVertexID().toHexString();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index 12b7ca36..c7446819 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -33,9 +33,18 @@ import java.util.Map;
 @Experimental
 public interface ScalingRealizer<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
-    /** Update job's parallelism to parallelismOverrides. */
-    void realizeParallelismOverrides(Context context, Map<String, String> 
parallelismOverrides);
+    /**
+     * Update job's parallelism to parallelismOverrides.
+     *
+     * @throws Exception Error during realize parallelism overrides.
+     */
+    void realizeParallelismOverrides(Context context, Map<String, String> 
parallelismOverrides)
+            throws Exception;
 
-    /** Updates the TaskManager memory configuration. */
-    void realizeConfigOverrides(Context context, ConfigChanges configChanges);
+    /**
+     * Updates the TaskManager memory configuration.
+     *
+     * @throws Exception Error during realize config overrides.
+     */
+    void realizeConfigOverrides(Context context, ConfigChanges configChanges) 
throws Exception;
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 4ec10b43..c9d74e65 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.metrics.TestMetrics;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
 import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
@@ -190,6 +191,43 @@ public class JobAutoScalerImplTest {
                 0, 
autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
     }
 
+    @Test
+    public void testRealizeParallelismOverridesExceptions() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID, 
Map.of(), 1, 20));
+        var metricsCollector =
+                new TestingMetricsCollector<JobID, 
JobAutoScalerContext<JobID>>(jobTopology);
+        ScalingRealizer<JobID, JobAutoScalerContext<JobID>>
+                realizeParallelismOverridesWithExceptionsScalingRealizer =
+                        new ScalingRealizer<>() {
+                            @Override
+                            public void realizeConfigOverrides(
+                                    JobAutoScalerContext context, 
ConfigChanges configChanges) {}
+
+                            @Override
+                            public void realizeParallelismOverrides(
+                                    JobAutoScalerContext context, Map 
parallelismOverrides) {
+                                throw new RuntimeException(
+                                        "Test Realize Parallelism Overrides 
Exceptions.");
+                            }
+                        };
+        stateStore.storeParallelismOverrides(context, 
Map.of(jobVertexID.toHexString(), "2"));
+
+        var autoscaler =
+                new JobAutoScalerImpl<>(
+                        metricsCollector,
+                        null,
+                        null,
+                        eventCollector,
+                        
realizeParallelismOverridesWithExceptionsScalingRealizer,
+                        stateStore);
+
+        // Should produce an error
+        autoscaler.scale(context);
+        Assertions.assertEquals(
+                1, 
autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
+    }
+
     @Test
     void testParallelismOverrides() throws Exception {
         var autoscaler =

Reply via email to