ifndef-SleePy commented on a change in pull request #12611:
URL: https://github.com/apache/flink/pull/12611#discussion_r439302459
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
assertEquals(0,
checkpointCoordinator.getTriggerRequestQueue().size());
}
+ /**
+ * This test only fails eventually.
+ */
+ @Test
+ public void
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws
Exception {
+ final ExecutionVertex executionVertex = mockExecutionVertex(new
ExecutionAttemptID());
+
+ final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ final CheckpointCoordinator checkpointCoordinator = new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTasks(new ExecutionVertex[]{executionVertex})
+ .setTimer(new
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+ .build())
+ .build();
+
+ final CompletableFuture<String> masterHookCheckpointFuture =
new CompletableFuture<>();
+ final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+ checkpointCoordinator.addMasterHook(new
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint>
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+ triggerCheckpointLatch.await();
+ masterHookCheckpointFuture.complete("Completed");
+
+ // discard triggering checkpoint
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ try {
+ // verify that the second checkpoint request
will be executed and eventually times out
+ secondCheckpoint.get();
Review comment:
It couldn't fail on my side either (based on release-1.11). I think the
reason is there is no asynchronous call completed after the pending checkpoint
has been aborted.
I would suggest to use `manuallyTriggeredScheduledExecutor` as timer thread
and use a separate thread for IO executor. Because,
1. It's closer to the real scenario and we can control the sequence of each
invocation accurately. We can easily abort the pending checkpoint first and
complete an IO asynchronous invocation to simulate the NPE scenario.
2. In the later PRs, when main thread is introduced to replace timer thread
(now it's reverted due to FLINK-16770, I'm preparing to get them back), the
coordinator-wide lock would be removed,
`triggerCheckpoint` and `abortPendingCheckpoints` would be called only in
main thread. At that time, we have to use `manuallyTriggeredScheduledExecutor`
as main thread executor to simulate the lock-free and single-threaded threading
model. If we do it here, we don't need change it then.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
- CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
- .whenCompleteAsync(
- (ignored, throwable) -> {
- final PendingCheckpoint
checkpoint =
-
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
- if (throwable == null &&
checkpoint != null && !checkpoint.isDiscarded()) {
- // no exception, no
discarding, everything is OK
- final long checkpointId
= checkpoint.getCheckpointId();
- snapshotTaskState(
- timestamp,
- checkpointId,
-
checkpoint.getCheckpointStorageLocation(),
- request.props,
- executions,
-
request.advanceToEndOfTime);
-
-
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
-
- onTriggerSuccess();
- } else {
- // the
initialization might not be finished yet
- if (checkpoint
== null) {
-
onTriggerFailure(request, throwable);
+ FutureUtils.assertNoException(
Review comment:
I agree that `FutureUtils.assertNoException` is a bit rough here,
however it would be more complex to fail the job through
`CheckpointFailureManager`. Because we have to consider two more things,
1. Does the checkpoint is properly initialized? `CheckpointFailureManager`
requires a checkpoint id but we might not have one.
2. `isTriggering` must be taken care of. Failing job does not reset this
flag.
So `FutureUtils.assertNoException` is acceptable for me. Technically
speaking, it should never be reached unless there is a bug of our code or some
other extreme corner cases, like OOM.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]