ifndef-SleePy commented on a change in pull request #10332:
[FLINK-13905][checkpointing] Separate checkpoint triggering into several
asynchronous stages
URL: https://github.com/apache/flink/pull/10332#discussion_r367778946
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -576,85 +648,181 @@ public boolean isShutdown() {
checkpoint.setStatsCallback(callback);
}
- // schedule the timer that will clean up the expired checkpoints
- final Runnable canceller = () -> {
- synchronized (lock) {
- // only do the work if the checkpoint is not
discarded anyways
- // note that checkpoint completion discards the
pending checkpoint object
- if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint {} of job {}
expired before completing.", checkpointID, job);
-
- abortPendingCheckpoint(
- checkpoint,
- new
CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
- }
- }
- };
+ synchronized (lock) {
- try {
- // re-acquire the coordinator-wide lock
- synchronized (lock) {
- preCheckBeforeTriggeringCheckpoint(isPeriodic,
props.forceCheckpoint());
+ pendingCheckpoints.put(checkpointID, checkpoint);
- LOG.info("Triggering checkpoint {} @ {} for job
{}.", checkpointID, timestamp, job);
+ ScheduledFuture<?> cancellerHandle = timer.schedule(
+ new CheckpointCanceller(checkpoint),
+ checkpointTimeout, TimeUnit.MILLISECONDS);
- pendingCheckpoints.put(checkpointID,
checkpoint);
+ if (!checkpoint.setCancellerHandle(cancellerHandle)) {
+ // checkpoint is already disposed!
+ cancellerHandle.cancel(false);
+ }
+ }
- ScheduledFuture<?> cancellerHandle =
timer.schedule(
- canceller,
- checkpointTimeout,
TimeUnit.MILLISECONDS);
+ LOG.info("Triggering checkpoint {} @ {} for job {}.",
checkpointID, timestamp, job);
+ return checkpoint;
+ }
- if
(!checkpoint.setCancellerHandle(cancellerHandle)) {
- // checkpoint is already disposed!
- cancellerHandle.cancel(false);
- }
+ /**
+ * Snapshot master hook states asynchronously.
+ *
+ * @param checkpoint the pending checkpoint
+ * @return the future represents master hook states are finished or not
+ */
+ private CompletableFuture<Void> snapshotMasterState(PendingCheckpoint
checkpoint) {
+ if (masterHooks.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
- // TODO, asynchronously snapshots master hook
without waiting here
- for (MasterTriggerRestoreHook<?> masterHook :
masterHooks.values()) {
- final MasterState masterState =
-
MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
- .get(checkpointTimeout,
TimeUnit.MILLISECONDS);
-
checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
- }
-
Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
- }
- // end of lock scope
+ final long checkpointID = checkpoint.getCheckpointId();
+ final long timestamp = checkpoint.getCheckpointTimestamp();
- final CheckpointOptions checkpointOptions = new
CheckpointOptions(
- props.getCheckpointType(),
-
checkpointStorageLocation.getLocationReference());
+ final CompletableFuture<Void> masterStateCompletableFuture =
new CompletableFuture<>();
+ for (MasterTriggerRestoreHook<?> masterHook :
masterHooks.values()) {
+ MasterHooks
+ .triggerHook(masterHook, checkpointID,
timestamp, executor)
+ .whenCompleteAsync((masterState, throwable) -> {
+ try {
+ synchronized (lock) {
+ if
(masterStateCompletableFuture.isDone()) {
+ return;
+ }
+ if
(checkpoint.isDiscarded()) {
+ throw new
IllegalStateException(
+
"Checkpoint " + checkpointID + " has been discarded");
+ }
+ if (throwable == null) {
+
checkpoint.acknowledgeMasterState(
+
masterHook.getIdentifier(), masterState);
+ if
(checkpoint.areMasterStatesFullyAcknowledged()) {
+
masterStateCompletableFuture.complete(null);
+ }
+ } else {
+
masterStateCompletableFuture.completeExceptionally(throwable);
+ }
+ }
+ } catch (Throwable t) {
+
masterStateCompletableFuture.completeExceptionally(t);
+ }
+ }, timer);
+ }
+ return masterStateCompletableFuture;
+ }
- // send the messages to the tasks that trigger their
checkpoint
- for (Execution execution: executions) {
- if (props.isSynchronous()) {
-
execution.triggerSynchronousSavepoint(checkpointID, timestamp,
checkpointOptions, advanceToEndOfTime);
- } else {
-
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
- }
+ /**
+ * Snapshot task state.
+ *
+ * @param timestamp the timestamp of this checkpoint reques
+ * @param checkpointID the checkpoint id
+ * @param checkpointStorageLocation the checkpoint location
+ * @param props the checkpoint properties
+ * @param executions the executions which should be triggered
+ * @param advanceToEndOfTime Flag indicating if the source should
inject a {@code MAX_WATERMARK}
+ * in the pipeline to fire any registered
event-time timers.
+ */
+ private void snapshotTaskState(
+ long timestamp,
+ long checkpointID,
+ CheckpointStorageLocation checkpointStorageLocation,
+ CheckpointProperties props,
+ Execution[] executions,
+ boolean advanceToEndOfTime) {
+
+ final CheckpointOptions checkpointOptions = new
CheckpointOptions(
+ props.getCheckpointType(),
+ checkpointStorageLocation.getLocationReference());
+
+ // send the messages to the tasks that trigger their checkpoint
+ for (Execution execution: executions) {
+ if (props.isSynchronous()) {
+
execution.triggerSynchronousSavepoint(checkpointID, timestamp,
checkpointOptions, advanceToEndOfTime);
+ } else {
+ execution.triggerCheckpoint(checkpointID,
timestamp, checkpointOptions);
}
-
- numUnsuccessfulCheckpointsTriggers.set(0);
- return checkpoint.getCompletionFuture();
}
- catch (Throwable t) {
- int numUnsuccessful =
numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint {} for job {}.
({} consecutive failed attempts so far)",
- checkpointID, job, numUnsuccessful, t);
+ }
- synchronized (lock) {
- if (!checkpoint.isDiscarded()) {
- abortPendingCheckpoint(
- checkpoint,
- new CheckpointException(
-
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t));
+ /**
+ * Trigger request is successful.
+ * NOTE, it must be invoked if trigger request is successful.
+ */
+ private void onTriggerSuccess() {
+ isTriggering = false;
+ numUnsuccessfulCheckpointsTriggers.set(0);
+ checkQueuedCheckpointTriggerRequest();
+ }
+
+ /**
+ * The trigger request is failed prematurely without a proper
initialization.
+ * There is no resource to release, but the completion promise needs to
fail manually here.
+ *
+ * @param onCompletionPromise the completion promise of the
checkpoint/savepoint
+ * @param throwable the reason of trigger failure
+ */
+ private void onTriggerFailure(
+ CompletableFuture<CompletedCheckpoint> onCompletionPromise,
Throwable throwable) {
+ final CheckpointException checkpointException =
+
getCheckpointException(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
throwable);
+ onCompletionPromise.completeExceptionally(checkpointException);
+ onTriggerFailure((PendingCheckpoint) null, checkpointException);
+ }
+
+ /**
+ * The trigger request is failed.
+ * NOTE, it must be invoked if trigger request is failed.
+ *
+ * @param checkpoint the pending checkpoint which is failed. It could
be null if it's failed
+ * prematurely without a proper initialization.
+ * @param throwable the reason of trigger failure
+ */
+ private void onTriggerFailure(@Nullable PendingCheckpoint checkpoint,
Throwable throwable) {
+ try {
Review comment:
Yes, you are right, we need some checking here. As mentioned above, it's not
easy (or a bit hacky) to check current thread for a `ScheduledExecutor`. Since
the timer thread would be abandoned when we put all these codes into main
thread, I think we should not spending too much time on this. We could do that
in main thread.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services