becketqin commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r465535249
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.getOnCompletionFuture()),
timer);
- final CompletableFuture<?> masterStatesComplete =
pendingCheckpointCompletableFuture
- .thenCompose(this::snapshotMasterState);
-
final CompletableFuture<?>
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
+ // We have to take the snapshot of the master hooks
after the coordinator checkpoints has completed.
+ // This is to ensure the tasks are checkpointed after
the OperatorCoordinators in case
+ // ExternallyInducedSource is used.
+ final CompletableFuture<?> masterStatesComplete =
coordinatorCheckpointsComplete
+ .thenComposeAsync(ignored -> {
+ PendingCheckpoint checkpoint =
+
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
Review comment:
The exceptions are handled in line 552 centrally.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode()
throws Exception {
}
}
+ /**
+ * Test that the checkpoint still behave correctly when the task
checkpoint is triggered by the
+ * master hooks and finished before the master checkpoint.
+ */
Review comment:
The test covers the checkpoint failure issue. The other case is sort of
indirectly tested by other tests which ensures all the `OperatorCoordinators`
have finished snapshots before moving on to the next step. We can add another
test to explicitly test the ordering issue.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.getOnCompletionFuture()),
timer);
- final CompletableFuture<?> masterStatesComplete =
pendingCheckpointCompletableFuture
- .thenCompose(this::snapshotMasterState);
-
final CompletableFuture<?>
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
.thenComposeAsync((pendingCheckpoint) ->
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
+ // We have to take the snapshot of the master hooks
after the coordinator checkpoints has completed.
+ // This is to ensure the tasks are checkpointed after
the OperatorCoordinators in case
+ // ExternallyInducedSource is used.
+ final CompletableFuture<?> masterStatesComplete =
coordinatorCheckpointsComplete
+ .thenComposeAsync(ignored -> {
Review comment:
It is true. The reason I changed that to `thenComposeAsync` is primarily
for safety. While it is quite clear that the code _getting_ this
`coordinatorCheckpointComplete` executes in the `timer`, the code that
_completes_ `coordinatorCheckpointsComplete` needs a few clicks to see it is
done in the `timer` as well.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -576,7 +583,22 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
request.advanceToEndOfTime);
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
-
+ // It
is possible that the tasks has finished checkpointing at this point.
+ // So
we need to complete this pending checkpoint.
Review comment:
This is not only about the threading model in the JM itself. Even if
there is a single thread in the JM, when the master hooks trigger external
system to start the checkpoint, the tasks will still send CP acknowledges to
the JM, those acks will be enqueued into the JM thread task queue, but might
still before the master state snapshot completes. In this case, the single
thread model won't solve the problem.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]