gaoyunhaii commented on a change in pull request #16432:
URL: https://github.com/apache/flink/pull/16432#discussion_r666640109



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
         }
     }
 
+    @Test
+    public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+
+        // Triggers one checkpoint.
+        CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+        holder.checkpointCoordinator(1, checkpointResult);
+        getCoordinator(holder).getLastTriggeredCheckpoint().complete(new 
byte[0]);
+
+        // Fails the event sending.
+        eventSendingResult.completeExceptionally(new 
RuntimeException("Artificial"));
+
+        assertTrue(eventSendingResult.isCompletedExceptionally());
+    }
+
+    @Test
+    public void testFailingCheckpointIfFailedEventNotProcessed() throws 
Exception {
+        final ReorderableManualExecutorService executor = new 
ReorderableManualExecutorService();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                new ComponentMainThreadExecutorServiceAdapter(
+                        (ScheduledExecutorService) executor, 
Thread.currentThread());
+
+        CompletableFuture<Acknowledge> eventSendingResult = new 
CompletableFuture<>();
+        final EventReceivingTasks tasks =
+                
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+
+        final OperatorCoordinatorHolder holder =
+                createCoordinatorHolder(tasks, 
TestingOperatorCoordinator::new, mainThreadExecutor);
+
+        // Send one event without finishing it.
+        getCoordinator(holder).getSubtaskGateway(0).sendEvent(new 
TestOperatorEvent(0));
+        executor.triggerAll();
+
+        // Finish the event sending. This will insert one runnable that handle
+        // failed events to the executor. And we pending this runnable to

Review comment:
       Here should be "delay the runnable", the initial thought is to test the 
case that a new checkpoint is triggered before the stage to trigger failover 
get executed, thus we need some method to delay this stage till the checkpoint 
is triggered. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to