This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f7018642a3677434a92c9a1c8d124ffb157edcdc Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sat Nov 21 22:08:04 2020 +0800 [FLINK-19535][runtime] Add a failed flag in the OperatorCoordinator to avoid failing the job multiple times. This closes #14158 --- .../coordination/OperatorCoordinatorHolder.java | 20 +++++++++++++++ .../OperatorCoordinatorHolderTest.java | 30 ++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index fa86de0..e5844ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -32,6 +32,9 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TemporaryClassLoaderContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.IOException; @@ -231,6 +234,9 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC // execution graph construction, before the main thread executor is set eventValve.reset(); + if (context != null) { + context.resetFailed(); + } coordinator.resetToCheckpoint(checkpointData); } @@ -366,6 +372,8 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC */ private static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context { + private static final Logger LOG = LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class); + private final OperatorID operatorId; private final OperatorEventValve eventValve; private final String operatorName; @@ -375,6 +383,8 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC private Consumer<Throwable> globalFailureHandler; private Executor schedulerExecutor; + private volatile boolean failed; + public LazyInitializedCoordinatorContext( final OperatorID operatorId, final OperatorEventValve eventValve, @@ -406,6 +416,10 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC checkState(isInitialized(), "Context was not yet initialized"); } + void resetFailed() { + failed = false; + } + @Override public OperatorID getOperatorId() { return operatorId; @@ -436,6 +450,12 @@ public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorC @Override public void failJob(final Throwable cause) { checkInitialized(); + if (failed) { + LOG.warn("Ignoring the request to fail job because the job is already failing. " + + "The ignored failure cause is", cause); + return; + } + failed = true; final FlinkException e = new FlinkException("Global failure triggered by OperatorCoordinator for '" + operatorName + "' (operator " + operatorId + ").", cause); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index 5962afa..3f02bb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -48,6 +48,7 @@ import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -280,6 +281,35 @@ public class OperatorCoordinatorHolderTest extends TestLogger { )); } + @Test + public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() throws Exception { + Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorProvider = + context -> new TestingOperatorCoordinator(context) { + @Override + public void handleEventFromOperator(int subtask, OperatorEvent event) { + context.failJob(new RuntimeException("Artificial Exception")); + } + }; + final TestEventSender sender = new TestEventSender(); + final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, coordinatorProvider); + + holder.handleEventFromOperator(0, new TestOperatorEvent()); + assertNotNull(globalFailure); + final Throwable firstGlobalFailure = globalFailure; + + holder.handleEventFromOperator(1, new TestOperatorEvent()); + assertEquals("The global failure should be the same instance because the context" + + "should only take the first request from the coordinator to fail the job.", + firstGlobalFailure, globalFailure); + + holder.resetToCheckpoint(new byte[0]); + holder.handleEventFromOperator(1, new TestOperatorEvent()); + assertNotEquals("The new failures should be propagated after the coordinator " + + "is reset.", firstGlobalFailure, globalFailure); + // Reset global failure to null to make the after method check happy. + globalFailure = null; + } + /** * This test verifies that the order of Checkpoint Completion and Event Sending observed from the * outside matches that from within the OperatorCoordinator.
