This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 7d90ce87 [FLINK-35896][autoscaler] Handle exception events in
RescaleApiScalingRealizer (#856)
7d90ce87 is described below
commit 7d90ce8716153e46608831a2aa5057a842262fdc
Author: big face cat <[email protected]>
AuthorDate: Tue Jul 30 16:33:05 2024 +0800
[FLINK-35896][autoscaler] Handle exception events in
RescaleApiScalingRealizer (#856)
---
.../realizer/RescaleApiScalingRealizer.java | 4 +--
.../realizer/RescaleApiScalingRealizerTest.java | 6 ++--
.../flink/autoscaler/realizer/ScalingRealizer.java | 17 +++++++---
.../flink/autoscaler/JobAutoScalerImplTest.java | 38 ++++++++++++++++++++++
4 files changed, 55 insertions(+), 10 deletions(-)
diff --git
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
index abf0f95b..f575e8b1 100644
---
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
+++
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizer.java
@@ -66,7 +66,7 @@ public class RescaleApiScalingRealizer<KEY, Context extends
JobAutoScalerContext
@Override
public void realizeParallelismOverrides(
- Context context, Map<String, String> parallelismOverrides) {
+ Context context, Map<String, String> parallelismOverrides) throws
Exception {
Configuration conf = context.getConfiguration();
if (!conf.get(JobManagerOptions.SCHEDULER)
.equals(JobManagerOptions.SchedulerType.Adaptive)) {
@@ -121,8 +121,6 @@ public class RescaleApiScalingRealizer<KEY, Context extends
JobAutoScalerContext
} else {
LOG.info("Vertex resources requirements already match target,
nothing to do...");
}
- } catch (Exception e) {
- LOG.warn("Failed to apply parallelism overrides.", e);
}
}
diff --git
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
index b16cbdac..f8dc839b 100644
---
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
+++
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/realizer/RescaleApiScalingRealizerTest.java
@@ -59,7 +59,7 @@ class RescaleApiScalingRealizerTest {
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
- void testUpdateResourceRequirements(boolean resourceIsChanged) {
+ void testUpdateResourceRequirements(boolean resourceIsChanged) throws
Exception {
var jobID = new JobID();
var jobVertex1 = new JobVertexID().toHexString();
var jobVertex2 = new JobVertexID().toHexString();
@@ -110,7 +110,7 @@ class RescaleApiScalingRealizerTest {
}
@Test
- void testDisableAdaptiveScheduler() {
+ void testDisableAdaptiveScheduler() throws Exception {
var jobID = new JobID();
var jobVertex1 = new JobVertexID().toHexString();
var jobVertex2 = new JobVertexID().toHexString();
@@ -132,7 +132,7 @@ class RescaleApiScalingRealizerTest {
}
@Test
- void testJobNotRunning() {
+ void testJobNotRunning() throws Exception {
var jobID = new JobID();
var jobVertex1 = new JobVertexID().toHexString();
var jobVertex2 = new JobVertexID().toHexString();
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index 12b7ca36..c7446819 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -33,9 +33,18 @@ import java.util.Map;
@Experimental
public interface ScalingRealizer<KEY, Context extends
JobAutoScalerContext<KEY>> {
- /** Update job's parallelism to parallelismOverrides. */
- void realizeParallelismOverrides(Context context, Map<String, String>
parallelismOverrides);
+ /**
+ * Update job's parallelism to parallelismOverrides.
+ *
+ * @throws Exception Error during realize parallelism overrides.
+ */
+ void realizeParallelismOverrides(Context context, Map<String, String>
parallelismOverrides)
+ throws Exception;
- /** Updates the TaskManager memory configuration. */
- void realizeConfigOverrides(Context context, ConfigChanges configChanges);
+ /**
+ * Updates the TaskManager memory configuration.
+ *
+ * @throws Exception Error during realize config overrides.
+ */
+ void realizeConfigOverrides(Context context, ConfigChanges configChanges)
throws Exception;
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 4ec10b43..c9d74e65 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -25,6 +25,7 @@ import
org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.TestMetrics;
+import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
@@ -190,6 +191,43 @@ public class JobAutoScalerImplTest {
0,
autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
}
+ @Test
+ public void testRealizeParallelismOverridesExceptions() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ JobTopology jobTopology = new JobTopology(new VertexInfo(jobVertexID,
Map.of(), 1, 20));
+ var metricsCollector =
+ new TestingMetricsCollector<JobID,
JobAutoScalerContext<JobID>>(jobTopology);
+ ScalingRealizer<JobID, JobAutoScalerContext<JobID>>
+ realizeParallelismOverridesWithExceptionsScalingRealizer =
+ new ScalingRealizer<>() {
+ @Override
+ public void realizeConfigOverrides(
+ JobAutoScalerContext context,
ConfigChanges configChanges) {}
+
+ @Override
+ public void realizeParallelismOverrides(
+ JobAutoScalerContext context, Map
parallelismOverrides) {
+ throw new RuntimeException(
+ "Test Realize Parallelism Overrides
Exceptions.");
+ }
+ };
+ stateStore.storeParallelismOverrides(context,
Map.of(jobVertexID.toHexString(), "2"));
+
+ var autoscaler =
+ new JobAutoScalerImpl<>(
+ metricsCollector,
+ null,
+ null,
+ eventCollector,
+
realizeParallelismOverridesWithExceptionsScalingRealizer,
+ stateStore);
+
+ // Should produce an error
+ autoscaler.scale(context);
+ Assertions.assertEquals(
+ 1,
autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
+ }
+
@Test
void testParallelismOverrides() throws Exception {
var autoscaler =