StephanEwen commented on a change in pull request #14158:
URL: https://github.com/apache/flink/pull/14158#discussion_r529318929



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -280,6 +281,36 @@ public void takeCheckpointAfterAbortedCheckpoint() throws 
Exception {
                ));
        }
 
+       @Test
+       public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() 
throws Exception {
+               Function<OperatorCoordinator.Context, OperatorCoordinator> 
coordinatorProvider =
+                       context -> new TestingOperatorCoordinator(context) {
+                               @Override
+                               public void handleEventFromOperator(int 
subtask, OperatorEvent event) {
+                                       context.failJob(new 
RuntimeException("Artificial Exception"));
+                               }
+                       };
+               final TestEventSender sender = new TestEventSender();
+               final OperatorCoordinatorHolder holder = 
createCoordinatorHolder(sender, coordinatorProvider);
+
+               holder.handleEventFromOperator(0, new TestOperatorEvent());
+               assertNotNull(globalFailure);
+               final Throwable firstGlobalFailure = globalFailure;
+
+               holder.handleEventFromOperator(1, new TestOperatorEvent());
+               assertEquals("The global failure should be the same instance 
because the context"

Review comment:
       Nit: I think we want referential equality here (`assertSame`) rather 
than semantical equality (`assertEquals`).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -139,7 +142,7 @@ public void lazyInitialize(SchedulerNG scheduler, 
ComponentMainThreadExecutor ma
        }
 
        @VisibleForTesting
-       void lazyInitialize(Consumer<Throwable> globalFailureHandler, 
ComponentMainThreadExecutor mainThreadExecutor) {
+       public void lazyInitialize(Consumer<Throwable> globalFailureHandler, 
ComponentMainThreadExecutor mainThreadExecutor) {

Review comment:
       Looks like this visibility increase is not necessary.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -401,6 +409,10 @@ private void checkInitialized() {
                        checkState(isInitialized(), "Context was not yet 
initialized");
                }
 
+               private void resetFailed() {

Review comment:
       Nit: The other methods called from Holder class are all "package 
private" to avoid bridge methods.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -325,7 +331,7 @@ public static OperatorCoordinatorHolder create(
        }
 
        @VisibleForTesting
-       static OperatorCoordinatorHolder create(
+       public static OperatorCoordinatorHolder create(

Review comment:
       Looks like this visibility increase is not necessary.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -381,6 +388,7 @@ public LazyInitializedCoordinatorContext(
                        this.eventValve = checkNotNull(eventValve);
                        this.operatorName = checkNotNull(operatorName);
                        this.operatorParallelism = operatorParallelism;
+                       this.failed = false;

Review comment:
       Nit: This is not necessary, false is the default.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to