[ https://issues.apache.org/jira/browse/FLINK-5897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885661#comment-15885661 ]
ASF GitHub Bot commented on FLINK-5897: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3411#discussion_r103192397 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable PendingCheckpointStats trackerCallback) { return onCompletionPromise; } - public CompletedCheckpoint finalizeCheckpoint() { + public CompletedCheckpoint finalizeCheckpointExternalized() throws IOException { + synchronized (lock) { - Preconditions.checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - - // Persist if required - String externalPath = null; - if (props.externalizeCheckpoint()) { - try { - Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - externalPath = SavepointStore.storeSavepoint( - targetDirectory, - savepoint - ); - } catch (IOException e) { - LOG.error("Failed to persist checkpoint {}.",checkpointId, e); - } - } + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - CompletedCheckpoint completed = new CompletedCheckpoint( - jobId, - checkpointId, - checkpointTimestamp, - System.currentTimeMillis(), - new HashMap<>(taskStates), - props, - externalPath); + // externalize the metadata + final Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); - onCompletionPromise.complete(completed); + // TEMP FIX - The savepoint store is strictly typed to file systems currently + // but the checkpoints think more generic. we need to work with file handles + // here until the savepoint serializer accepts a generic stream factory - if (statsCallback != null) { - // Finalize the statsCallback and give the completed checkpoint a - // callback for discards. - CompletedCheckpointStats.DiscardCallback discardCallback = statsCallback.reportCompletedCheckpoint(externalPath); - completed.setDiscardCallback(discardCallback); - } + final FileStateHandle metadataHandle = SavepointStore.storeSavepointToHandle(targetDirectory, savepoint); + final String externalPointer = metadataHandle.getFilePath().getParent().toString(); - dispose(false); + return finalizeInternal(metadataHandle, externalPointer); + } + } + + public CompletedCheckpoint finalizeCheckpointNonExternalized() { + synchronized (lock) { + checkState(isFullyAcknowledged(), "Pending checkpoint has not been fully acknowledged yet."); - return completed; + // finalize without external metadata + return finalizeInternal(null, null); } } + @GuardedBy("lock") + private CompletedCheckpoint finalizeInternal( + @Nullable StreamStateHandle externalMetadata, + @Nullable String externalPointer) { + + assert(Thread.holdsLock(lock)); + + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalMetadata, + externalPointer); + + onCompletionPromise.complete(completed); --- End diff -- If the creation `CompletedCheckpoint` fails (for example because it the external metadata is null although the properties say the checkpoint should have been externalized), the promise is never completed. I think we should do a try catch and fail the promise in that case. > Untie Checkpoint Externalization from FileSystems > ------------------------------------------------- > > Key: FLINK-5897 > URL: https://issues.apache.org/jira/browse/FLINK-5897 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.3.0 > > > Currently, externalizing checkpoint metadata and storing savepoints depends > strictly on FileSystems. > Since state backends are more general, storing and cleaning up checkpoints > with state backend hooks requires to untie savepoints and externalized > checkpoints from filesystems. -- This message was sent by Atlassian JIRA (v6.3.15#6346)