This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ff5a8a9769 [FLINK-31893][runtime] Introduce AdaptiveBatchScheduler
failure enrichment/labeling
1ff5a8a9769 is described below
commit 1ff5a8a976995ee083e8ccce8aaa2edf9a696ca2
Author: Panagiotis Garefalakis <[email protected]>
AuthorDate: Thu May 18 23:36:30 2023 -0700
[FLINK-31893][runtime] Introduce AdaptiveBatchScheduler failure
enrichment/labeling
* Label failures part of
AdaptiveBatchScheduler#initializeVerticesIfPossible replacing failJob call with
handleGlobalFailure on a unrecoverable error
* Extend AdaptiveBatchSchedulerTest to validate functionality
---
.../adaptivebatch/AdaptiveBatchScheduler.java | 4 +-
.../adaptivebatch/AdaptiveBatchSchedulerTest.java | 72 +++++++++++++++++++---
2 files changed, 65 insertions(+), 11 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index afe64c27728..11cf5271042 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -30,6 +30,7 @@ import
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -43,7 +44,6 @@ import
org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
-import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -300,7 +300,7 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler {
}
} catch (JobException ex) {
log.error("Unexpected error occurred when initializing
ExecutionJobVertex", ex);
- failJob(ex, System.currentTimeMillis(),
FailureEnricherUtils.EMPTY_FAILURE_LABELS);
+ this.handleGlobalFailure(new SuppressRestartsException(ex));
}
if (newlyInitializedJobVertices.size() > 0) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index 2f4acebdbaa..1940e04ed84 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BatchExecutionOptions;
+import org.apache.flink.core.failure.FailureEnricher;
+import org.apache.flink.core.failure.TestingFailureEnricher;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -30,7 +32,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
-import
org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -42,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
+import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
@@ -55,6 +59,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -70,6 +75,7 @@ import static
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFin
import static
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider;
import static
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link AdaptiveBatchScheduler}. */
class AdaptiveBatchSchedulerTest {
@@ -91,6 +97,23 @@ class AdaptiveBatchSchedulerTest {
taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
}
+ @Test
+ void testVertexInitializationFailureIsLabeled() throws Exception {
+ final JobGraph jobGraph = createBrokenJobGraph();
+ final TestingFailureEnricher failureEnricher = new
TestingFailureEnricher();
+ final RestartBackoffTimeStrategy restartStrategy =
+ new
FixedDelayRestartBackoffTimeStrategyFactory(Integer.MAX_VALUE, 0L).create();
+ final SchedulerBase scheduler =
+ createScheduler(jobGraph,
Collections.singleton(failureEnricher), restartStrategy);
+ // Triggered failure on initializeJobVertex that should be labeled
+
assertThatThrownBy(scheduler::startScheduling).isInstanceOf(IllegalStateException.class);
+ final Iterable<RootExceptionHistoryEntry> exceptionHistory =
+ scheduler.requestJob().getExceptionHistory();
+ final RootExceptionHistoryEntry failure =
exceptionHistory.iterator().next();
+ assertThat(failure.getException()).hasMessageContaining("The failure
is not recoverable");
+
assertThat(failure.getFailureLabels()).isEqualTo(failureEnricher.getFailureLabels());
+ }
+
@Test
void testAdaptiveBatchScheduler() throws Exception {
JobGraph jobGraph = createJobGraph();
@@ -196,9 +219,7 @@ class AdaptiveBatchSchedulerTest {
.setDelayExecutor(taskRestartExecutor)
.setPartitionTracker(partitionTracker)
.setRestartBackoffTimeStrategy(
- new FixedDelayRestartBackoffTimeStrategy
-
.FixedDelayRestartBackoffTimeStrategyFactory(10, 0)
- .create())
+ new
FixedDelayRestartBackoffTimeStrategyFactory(10, 0).create())
.setVertexParallelismAndInputInfosDecider(
createCustomParallelismDecider(maxParallelism))
.setDefaultMaxParallelism(maxParallelism)
@@ -460,13 +481,31 @@ class AdaptiveBatchSchedulerTest {
}
public JobGraph createJobGraph() {
+ return createJobGraph(false);
+ }
+
+ private JobGraph createBrokenJobGraph() {
+ // this will break the JobGraph by using the same dataset id twice
+ return createJobGraph(true);
+ }
+
+ public JobGraph createJobGraph(boolean broken) {
final JobVertex source1 = createJobVertex("source1",
SOURCE_PARALLELISM_1);
final JobVertex source2 = createJobVertex("source2",
SOURCE_PARALLELISM_2);
final JobVertex sink = createJobVertex("sink", -1);
+ final IntermediateDataSetID sharedDataSetId = new
IntermediateDataSetID();
sink.connectNewDataSetAsInput(
- source1, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ source1,
+ DistributionPattern.POINTWISE,
+ ResultPartitionType.BLOCKING,
+ broken ? sharedDataSetId : new IntermediateDataSetID(),
+ false);
sink.connectNewDataSetAsInput(
- source2, DistributionPattern.POINTWISE,
ResultPartitionType.BLOCKING);
+ source2,
+ DistributionPattern.POINTWISE,
+ ResultPartitionType.BLOCKING,
+ broken ? sharedDataSetId : new IntermediateDataSetID(),
+ false);
return new JobGraph(new JobID(), "test job", source1, source2, sink);
}
@@ -482,11 +521,26 @@ class AdaptiveBatchSchedulerTest {
VertexParallelismAndInputInfosDecider
vertexParallelismAndInputInfosDecider,
int defaultMaxParallelism)
throws Exception {
- return new DefaultSchedulerBuilder(
- jobGraph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
- .setDelayExecutor(taskRestartExecutor)
+ return createSchedulerBuilder(jobGraph)
.setVertexParallelismAndInputInfosDecider(vertexParallelismAndInputInfosDecider)
.setDefaultMaxParallelism(defaultMaxParallelism)
.buildAdaptiveBatchJobScheduler();
}
+
+ private SchedulerBase createScheduler(
+ JobGraph jobGraph,
+ Collection<FailureEnricher> failureEnrichers,
+ RestartBackoffTimeStrategy strategy)
+ throws Exception {
+ return createSchedulerBuilder(jobGraph)
+ .setRestartBackoffTimeStrategy(strategy)
+ .setFailureEnrichers(failureEnrichers)
+ .buildAdaptiveBatchJobScheduler();
+ }
+
+ private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) {
+ return new DefaultSchedulerBuilder(
+ jobGraph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
+ .setDelayExecutor(taskRestartExecutor);
+ }
}