gaoyunhaii commented on a change in pull request #16432:
URL: https://github.com/apache/flink/pull/16432#discussion_r666353812
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
##########
@@ -72,19 +79,28 @@
FutureUtils.assertNoException(
result.handleAsync(
(success, failure) -> {
- if (failure != null &&
subtaskAccess.isStillRunning()) {
- String msg =
- String.format(
- EVENT_LOSS_ERROR_MESSAGE,
- evt,
- subtaskAccess.subtaskName());
- subtaskAccess.triggerTaskFailover(new
FlinkException(msg, failure));
+ if (failure != null) {
+ if (subtaskAccess.isStillRunning()) {
+ String msg =
+ String.format(
+ EVENT_LOSS_ERROR_MESSAGE,
+ evt,
+
subtaskAccess.subtaskName());
+ subtaskAccess.triggerTaskFailover(
+ new FlinkException(msg, failure));
+ }
+
+
nonSuccessFuturesTrack.removeFailedFuture(result);
Review comment:
Hi Till, very thanks for the review! First for this issue, my initial
concern for this method is that it seems might cause deadlocks:
1. For a checkpoint when it gets to the
`completeCheckpointOnceEventsAreDone`, it would block the main thread and waits
for all the pending event futures (with this method it would be the `whenAsync`
one) are done.
2. When the event sending result future finished, the thread finish it would
also try to submit the `whenAsync` stage to the main thread, which would not
get executed since the main thread is blocked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]