gaoyunhaii commented on a change in pull request #16432:
URL: https://github.com/apache/flink/pull/16432#discussion_r666640109
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,82 @@ private void checkpointEventValueAtomicity(
}
}
+ @Test
+ public void testFailingCheckpointsIfSendingEventFailed() throws Exception {
+ CompletableFuture<Acknowledge> eventSendingResult = new
CompletableFuture<>();
+ final EventReceivingTasks tasks =
+
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+ final OperatorCoordinatorHolder holder =
+ createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new);
+
+ // Send one event without finishing it.
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(0));
+
+ // Triggers one checkpoint.
+ CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+ holder.checkpointCoordinator(1, checkpointResult);
+ getCoordinator(holder).getLastTriggeredCheckpoint().complete(new
byte[0]);
+
+ // Fails the event sending.
+ eventSendingResult.completeExceptionally(new
RuntimeException("Artificial"));
+
+ assertTrue(eventSendingResult.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testFailingCheckpointIfFailedEventNotProcessed() throws
Exception {
+ final ReorderableManualExecutorService executor = new
ReorderableManualExecutorService();
+ final ComponentMainThreadExecutor mainThreadExecutor =
+ new ComponentMainThreadExecutorServiceAdapter(
+ (ScheduledExecutorService) executor,
Thread.currentThread());
+
+ CompletableFuture<Acknowledge> eventSendingResult = new
CompletableFuture<>();
+ final EventReceivingTasks tasks =
+
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+
+ final OperatorCoordinatorHolder holder =
+ createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new, mainThreadExecutor);
+
+ // Send one event without finishing it.
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(0));
+ executor.triggerAll();
+
+ // Finish the event sending. This will insert one runnable that handle
+ // failed events to the executor. And we pending this runnable to
Review comment:
Here should be "delay the runnable", the initial thought is to test the
case that a new checkpoint is triggered before the stage to trigger failover
get executed, thus we need some method to delay this stage till the checkpoint
is triggered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]