This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ff5a8a9769 [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler 
failure enrichment/labeling
1ff5a8a9769 is described below

commit 1ff5a8a976995ee083e8ccce8aaa2edf9a696ca2
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Thu May 18 23:36:30 2023 -0700

    [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure 
enrichment/labeling
    
    * Label failures part of 
AdaptiveBatchScheduler#initializeVerticesIfPossible replacing failJob call with 
handleGlobalFailure on a unrecoverable error
    * Extend AdaptiveBatchSchedulerTest to validate functionality
---
 .../adaptivebatch/AdaptiveBatchScheduler.java      |  4 +-
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  | 72 +++++++++++++++++++---
 2 files changed, 65 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index afe64c27728..11cf5271042 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -43,7 +44,6 @@ import 
org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
 import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.failure.FailureEnricherUtils;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -300,7 +300,7 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
             }
         } catch (JobException ex) {
             log.error("Unexpected error occurred when initializing 
ExecutionJobVertex", ex);
-            failJob(ex, System.currentTimeMillis(), 
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
+            this.handleGlobalFailure(new SuppressRestartsException(ex));
         }
 
         if (newlyInitializedJobVertices.size() > 0) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index 2f4acebdbaa..1940e04ed84 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.TestingFailureEnricher;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -30,7 +32,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
-import 
org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
+import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
@@ -55,6 +59,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -70,6 +75,7 @@ import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFin
 import static 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider;
 import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link AdaptiveBatchScheduler}. */
 class AdaptiveBatchSchedulerTest {
@@ -91,6 +97,23 @@ class AdaptiveBatchSchedulerTest {
         taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
     }
 
+    @Test
+    void testVertexInitializationFailureIsLabeled() throws Exception {
+        final JobGraph jobGraph = createBrokenJobGraph();
+        final TestingFailureEnricher failureEnricher = new 
TestingFailureEnricher();
+        final RestartBackoffTimeStrategy restartStrategy =
+                new 
FixedDelayRestartBackoffTimeStrategyFactory(Integer.MAX_VALUE, 0L).create();
+        final SchedulerBase scheduler =
+                createScheduler(jobGraph, 
Collections.singleton(failureEnricher), restartStrategy);
+        // Triggered failure on initializeJobVertex that should be labeled
+        
assertThatThrownBy(scheduler::startScheduling).isInstanceOf(IllegalStateException.class);
+        final Iterable<RootExceptionHistoryEntry> exceptionHistory =
+                scheduler.requestJob().getExceptionHistory();
+        final RootExceptionHistoryEntry failure = 
exceptionHistory.iterator().next();
+        assertThat(failure.getException()).hasMessageContaining("The failure 
is not recoverable");
+        
assertThat(failure.getFailureLabels()).isEqualTo(failureEnricher.getFailureLabels());
+    }
+
     @Test
     void testAdaptiveBatchScheduler() throws Exception {
         JobGraph jobGraph = createJobGraph();
@@ -196,9 +219,7 @@ class AdaptiveBatchSchedulerTest {
                         .setDelayExecutor(taskRestartExecutor)
                         .setPartitionTracker(partitionTracker)
                         .setRestartBackoffTimeStrategy(
-                                new FixedDelayRestartBackoffTimeStrategy
-                                                
.FixedDelayRestartBackoffTimeStrategyFactory(10, 0)
-                                        .create())
+                                new 
FixedDelayRestartBackoffTimeStrategyFactory(10, 0).create())
                         .setVertexParallelismAndInputInfosDecider(
                                 createCustomParallelismDecider(maxParallelism))
                         .setDefaultMaxParallelism(maxParallelism)
@@ -460,13 +481,31 @@ class AdaptiveBatchSchedulerTest {
     }
 
     public JobGraph createJobGraph() {
+        return createJobGraph(false);
+    }
+
+    private JobGraph createBrokenJobGraph() {
+        // this will break the JobGraph by using the same dataset id twice
+        return createJobGraph(true);
+    }
+
+    public JobGraph createJobGraph(boolean broken) {
         final JobVertex source1 = createJobVertex("source1", 
SOURCE_PARALLELISM_1);
         final JobVertex source2 = createJobVertex("source2", 
SOURCE_PARALLELISM_2);
         final JobVertex sink = createJobVertex("sink", -1);
+        final IntermediateDataSetID sharedDataSetId = new 
IntermediateDataSetID();
         sink.connectNewDataSetAsInput(
-                source1, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+                source1,
+                DistributionPattern.POINTWISE,
+                ResultPartitionType.BLOCKING,
+                broken ? sharedDataSetId : new IntermediateDataSetID(),
+                false);
         sink.connectNewDataSetAsInput(
-                source2, DistributionPattern.POINTWISE, 
ResultPartitionType.BLOCKING);
+                source2,
+                DistributionPattern.POINTWISE,
+                ResultPartitionType.BLOCKING,
+                broken ? sharedDataSetId : new IntermediateDataSetID(),
+                false);
         return new JobGraph(new JobID(), "test job", source1, source2, sink);
     }
 
@@ -482,11 +521,26 @@ class AdaptiveBatchSchedulerTest {
             VertexParallelismAndInputInfosDecider 
vertexParallelismAndInputInfosDecider,
             int defaultMaxParallelism)
             throws Exception {
-        return new DefaultSchedulerBuilder(
-                        jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
-                .setDelayExecutor(taskRestartExecutor)
+        return createSchedulerBuilder(jobGraph)
                 
.setVertexParallelismAndInputInfosDecider(vertexParallelismAndInputInfosDecider)
                 .setDefaultMaxParallelism(defaultMaxParallelism)
                 .buildAdaptiveBatchJobScheduler();
     }
+
+    private SchedulerBase createScheduler(
+            JobGraph jobGraph,
+            Collection<FailureEnricher> failureEnrichers,
+            RestartBackoffTimeStrategy strategy)
+            throws Exception {
+        return createSchedulerBuilder(jobGraph)
+                .setRestartBackoffTimeStrategy(strategy)
+                .setFailureEnrichers(failureEnrichers)
+                .buildAdaptiveBatchJobScheduler();
+    }
+
+    private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) {
+        return new DefaultSchedulerBuilder(
+                        jobGraph, mainThreadExecutor, 
EXECUTOR_RESOURCE.getExecutor())
+                .setDelayExecutor(taskRestartExecutor);
+    }
 }

Reply via email to