tillrohrmann commented on a change in pull request #16432:
URL: https://github.com/apache/flink/pull/16432#discussion_r667754578



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws 
Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.

Review comment:
       ```suggestion
           // Trigger one checkpoint.
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws 
Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.

Review comment:
       ```suggestion
           // Fail the sent event.
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws 
Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(checkpointResult.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testFailingCheckpointIfSendingEventFailedBeforeTrigger() 
throws Exception {

Review comment:
       ```suggestion
       public void testCheckpointFailsIfSendingEventFailedBeforeTrigger() 
throws Exception {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -80,11 +86,14 @@
                                                 subtaskAccess.subtaskName());
                                 subtaskAccess.triggerTaskFailover(new 
FlinkException(msg, failure));

Review comment:
       I think we should guard that this method does not throw an exception. If 
it does, then we should fail hard. This will ensure that we don't swallow this 
failure as a send event failure.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws 
Exception {

Review comment:
       ```suggestion
       public void testCheckpointFailsIfSendingEventFailedAfterTrigger() throws 
Exception {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -80,11 +86,14 @@
                                                 subtaskAccess.subtaskName());
                                 subtaskAccess.triggerTaskFailover(new 
FlinkException(msg, failure));

Review comment:
       Maybe we can add a util `Runnables.assertNoException(Runnable)` that 
calls the `FatalExitExceptionHandler.INSTANCE`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to