This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f7018642a3677434a92c9a1c8d124ffb157edcdc
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Sat Nov 21 22:08:04 2020 +0800

    [FLINK-19535][runtime] Add a failed flag in the OperatorCoordinator to 
avoid failing the job multiple times.
    
    This closes #14158
---
 .../coordination/OperatorCoordinatorHolder.java    | 20 +++++++++++++++
 .../OperatorCoordinatorHolderTest.java             | 30 ++++++++++++++++++++++
 2 files changed, 50 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index fa86de0..e5844ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -32,6 +32,9 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -231,6 +234,9 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                // execution graph construction, before the main thread 
executor is set
 
                eventValve.reset();
+               if (context != null) {
+                       context.resetFailed();
+               }
                coordinator.resetToCheckpoint(checkpointData);
        }
 
@@ -366,6 +372,8 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
         */
        private static final class LazyInitializedCoordinatorContext implements 
OperatorCoordinator.Context {
 
+               private static final Logger LOG = 
LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class);
+
                private final OperatorID operatorId;
                private final OperatorEventValve eventValve;
                private final String operatorName;
@@ -375,6 +383,8 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                private Consumer<Throwable> globalFailureHandler;
                private Executor schedulerExecutor;
 
+               private volatile boolean failed;
+
                public LazyInitializedCoordinatorContext(
                                final OperatorID operatorId,
                                final OperatorEventValve eventValve,
@@ -406,6 +416,10 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                        checkState(isInitialized(), "Context was not yet 
initialized");
                }
 
+               void resetFailed() {
+                       failed = false;
+               }
+
                @Override
                public OperatorID getOperatorId() {
                        return operatorId;
@@ -436,6 +450,12 @@ public class OperatorCoordinatorHolder implements 
OperatorCoordinator, OperatorC
                @Override
                public void failJob(final Throwable cause) {
                        checkInitialized();
+                       if (failed) {
+                               LOG.warn("Ignoring the request to fail job 
because the job is already failing. "
+                                                       + "The ignored failure 
cause is", cause);
+                               return;
+                       }
+                       failed = true;
 
                        final FlinkException e = new FlinkException("Global 
failure triggered by OperatorCoordinator for '" +
                                operatorName + "' (operator " + operatorId + 
").", cause);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index 5962afa..3f02bb7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -48,6 +48,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -280,6 +281,35 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
                ));
        }
 
+       @Test
+       public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() 
throws Exception {
+               Function<OperatorCoordinator.Context, OperatorCoordinator> 
coordinatorProvider =
+                       context -> new TestingOperatorCoordinator(context) {
+                               @Override
+                               public void handleEventFromOperator(int 
subtask, OperatorEvent event) {
+                                       context.failJob(new 
RuntimeException("Artificial Exception"));
+                               }
+                       };
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, coordinatorProvider);
+
+               holder.handleEventFromOperator(0, new TestOperatorEvent());
+               assertNotNull(globalFailure);
+               final Throwable firstGlobalFailure = globalFailure;
+
+               holder.handleEventFromOperator(1, new TestOperatorEvent());
+               assertEquals("The global failure should be the same instance 
because the context"
+                                               + "should only take the first 
request from the coordinator to fail the job.",
+                               firstGlobalFailure, globalFailure);
+
+               holder.resetToCheckpoint(new byte[0]);
+               holder.handleEventFromOperator(1, new TestOperatorEvent());
+               assertNotEquals("The new failures should be propagated after 
the coordinator "
+                                                       + "is reset.", 
firstGlobalFailure, globalFailure);
+               // Reset global failure to null to make the after method check 
happy.
+               globalFailure = null;
+       }
+
        /**
         * This test verifies that the order of Checkpoint Completion and Event 
Sending observed from the
         * outside matches that from within the OperatorCoordinator.

Reply via email to