tillrohrmann commented on a change in pull request #16432:
URL: https://github.com/apache/flink/pull/16432#discussion_r667754578
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
}
}
+ @Test
+ public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws
Exception {
+ CompletableFuture<Acknowledge> eventSendingResult = new
CompletableFuture<>();
+ final EventReceivingTasks tasks =
+
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+ final OperatorCoordinatorHolder holder =
+ createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new);
+
+ // Send one event without finishing it.
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(0));
+
+ // Triggers one checkpoint.
Review comment:
```suggestion
// Trigger one checkpoint.
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
}
}
+ @Test
+ public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws
Exception {
+ CompletableFuture<Acknowledge> eventSendingResult = new
CompletableFuture<>();
+ final EventReceivingTasks tasks =
+
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+ final OperatorCoordinatorHolder holder =
+ createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new);
+
+ // Send one event without finishing it.
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(0));
+
+ // Triggers one checkpoint.
+ CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+ holder.checkpointCoordinator(1, checkpointResult);
+ getCoordinator(holder).getLastTriggeredCheckpoint().complete(new
byte[0]);
+
+ // Fails the event sending.
Review comment:
```suggestion
// Fail the sent event.
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
}
}
+ @Test
+ public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws
Exception {
+ CompletableFuture<Acknowledge> eventSendingResult = new
CompletableFuture<>();
+ final EventReceivingTasks tasks =
+
EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
+ final OperatorCoordinatorHolder holder =
+ createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new);
+
+ // Send one event without finishing it.
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(0));
+
+ // Triggers one checkpoint.
+ CompletableFuture<byte[]> checkpointResult = new CompletableFuture<>();
+ holder.checkpointCoordinator(1, checkpointResult);
+ getCoordinator(holder).getLastTriggeredCheckpoint().complete(new
byte[0]);
+
+ // Fails the event sending.
+ eventSendingResult.completeExceptionally(new
RuntimeException("Artificial"));
+
+ assertTrue(checkpointResult.isCompletedExceptionally());
+ }
+
+ @Test
+ public void testFailingCheckpointIfSendingEventFailedBeforeTrigger()
throws Exception {
Review comment:
```suggestion
public void testCheckpointFailsIfSendingEventFailedBeforeTrigger()
throws Exception {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -80,11 +86,14 @@
subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new
FlinkException(msg, failure));
Review comment:
I think we should guard that this method does not throw an exception. If
it does, then we should fail hard. This will ensure that we don't swallow this
failure as a send event failure.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
##########
@@ -370,6 +373,70 @@ private void checkpointEventValueAtomicity(
}
}
+ @Test
+ public void testFailingCheckpointIfSendingEventFailedAfterTrigger() throws
Exception {
Review comment:
```suggestion
public void testCheckpointFailsIfSendingEventFailedAfterTrigger() throws
Exception {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -80,11 +86,14 @@
subtaskAccess.subtaskName());
subtaskAccess.triggerTaskFailover(new
FlinkException(msg, failure));
Review comment:
Maybe we can add a util `Runnables.assertNoException(Runnable)` that
calls the `FatalExitExceptionHandler.INSTANCE`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]