Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3411#discussion_r103192397 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) { return onCompletionPromise; } - public CompletedCheckpoint finalizeCheckpoint() { + public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { + synchronized (lock) { - Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - - // Persist if required - String externalPath = null; - if (props.externalizeCheckpoint()) { - try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint - ); - } catch (IOException e) { - LOG.error("Failed to persist checkpoint {}.",checkpointId, e); - } - } + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - CompletedCheckpoint completed = new CompletedCheckpoint( - jobId, - checkpointId, - checkpointTimestamp, - System.currentTimeMillis(), - new HashMap<>(taskStates), - props, - externalPath); + // externalize the metadata + final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - onCompletionPromise.complete(completed); + // TEMP FIX - The savepoint store is strictly typed to file systems currently + // but the checkpoints think more generic. we need to work with file handles + // here until the savepoint serializer accepts a generic stream factory - if (statsCallback != null) { - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath); - completed.setDiscardCallback(discardCallback); - } + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); - dispose(false); + return finalizeInternal(metadataHandle, externalPointer); + } + } + + public CompletedCheckpoint finalizeCheckpointNonExternalized() { + synchronized (lock) { + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - return completed; + // finalize without external metadata + return finalizeInternal(null, null); } } + @GuardedBy("lock") + private CompletedCheckpoint finalizeInternal( + @Nullable StreamStateHandle externalMetadata, + @Nullable String externalPointer) { + + assert(Thread.holdsLock(lock)); + + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalMetadata, + externalPointer); + + onCompletionPromise.complete(completed); --- End diff -- If the creation `CompletedCheckpoint` fails (for example because it the external metadata is null although the properties say the checkpoint should have been externalized), the promise is never completed. I think we should do a try catch and fail the promise in that case.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---