[ 
https://issues.apache.org/jira/browse/FLINK-5897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15885661#comment-15885661
 ] 

ASF GitHub Bot commented on FLINK-5897:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103192397
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
    @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable 
PendingCheckpointStats trackerCallback) {
                return onCompletionPromise;
        }
     
    -   public CompletedCheckpoint finalizeCheckpoint() {
    +   public CompletedCheckpoint finalizeCheckpointExternalized() throws 
IOException {
    +
                synchronized (lock) {
    -                   Preconditions.checkState(isFullyAcknowledged(), 
"Pending checkpoint has not been fully acknowledged yet.");
    -
    -                   // Persist if required
    -                   String externalPath = null;
    -                   if (props.externalizeCheckpoint()) {
    -                           try {
    -                                   Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
    -                                   externalPath = 
SavepointStore.storeSavepoint(
    -                                                   targetDirectory,
    -                                                   savepoint
    -                                   );
    -                           } catch (IOException e) {
    -                                   LOG.error("Failed to persist checkpoint 
{}.",checkpointId, e);
    -                           }
    -                   }
    +                   checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
     
    -                   CompletedCheckpoint completed = new CompletedCheckpoint(
    -                                   jobId,
    -                                   checkpointId,
    -                                   checkpointTimestamp,
    -                                   System.currentTimeMillis(),
    -                                   new HashMap<>(taskStates),
    -                                   props,
    -                                   externalPath);
    +                   // externalize the metadata
    +                   final Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
     
    -                   onCompletionPromise.complete(completed);
    +                   // TEMP FIX - The savepoint store is strictly typed to 
file systems currently
    +                   //            but the checkpoints think more generic. 
we need to work with file handles
    +                   //            here until the savepoint serializer 
accepts a generic stream factory
     
    -                   if (statsCallback != null) {
    -                           // Finalize the statsCallback and give the 
completed checkpoint a
    -                           // callback for discards.
    -                           CompletedCheckpointStats.DiscardCallback 
discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
    -                           completed.setDiscardCallback(discardCallback);
    -                   }
    +                   final FileStateHandle metadataHandle = 
SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
    +                   final String externalPointer = 
metadataHandle.getFilePath().getParent().toString();
     
    -                   dispose(false);
    +                   return finalizeInternal(metadataHandle, 
externalPointer);
    +           }
    +   }
    +
    +   public CompletedCheckpoint finalizeCheckpointNonExternalized() {
    +           synchronized (lock) {
    +                   checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
     
    -                   return completed;
    +                   // finalize without external metadata
    +                   return finalizeInternal(null, null);
                }
        }
     
    +   @GuardedBy("lock")
    +   private CompletedCheckpoint finalizeInternal(
    +                   @Nullable StreamStateHandle externalMetadata,
    +                   @Nullable String externalPointer) {
    +
    +           assert(Thread.holdsLock(lock));
    +
    +           CompletedCheckpoint completed = new CompletedCheckpoint(
    +                           jobId,
    +                           checkpointId,
    +                           checkpointTimestamp,
    +                           System.currentTimeMillis(),
    +                           new HashMap<>(taskStates),
    +                           props,
    +                           externalMetadata,
    +                           externalPointer);
    +
    +           onCompletionPromise.complete(completed);
    --- End diff --
    
    If the creation `CompletedCheckpoint` fails (for example because it the 
external metadata is null although the properties say the checkpoint should 
have been externalized), the promise is never completed. I think we should do a 
try catch and fail the promise in that case.


> Untie Checkpoint Externalization from FileSystems
> -------------------------------------------------
>
>                 Key: FLINK-5897
>                 URL: https://issues.apache.org/jira/browse/FLINK-5897
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.3.0
>
>
> Currently, externalizing checkpoint metadata and storing savepoints depends 
> strictly on FileSystems.
> Since state backends are more general, storing and cleaning up checkpoints 
> with state backend hooks requires to untie savepoints and externalized 
> checkpoints from filesystems.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to