Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3411#discussion_r103192397
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
    @@ -203,48 +208,67 @@ void setStatsCallback(@Nullable 
PendingCheckpointStats trackerCallback) {
                return onCompletionPromise;
        }
     
    -   public CompletedCheckpoint finalizeCheckpoint() {
    +   public CompletedCheckpoint finalizeCheckpointExternalized() throws 
IOException {
    +
                synchronized (lock) {
    -                   Preconditions.checkState(isFullyAcknowledged(), 
"Pending checkpoint has not been fully acknowledged yet.");
    -
    -                   // Persist if required
    -                   String externalPath = null;
    -                   if (props.externalizeCheckpoint()) {
    -                           try {
    -                                   Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
    -                                   externalPath = 
SavepointStore.storeSavepoint(
    -                                                   targetDirectory,
    -                                                   savepoint
    -                                   );
    -                           } catch (IOException e) {
    -                                   LOG.error("Failed to persist checkpoint 
{}.",checkpointId, e);
    -                           }
    -                   }
    +                   checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
     
    -                   CompletedCheckpoint completed = new CompletedCheckpoint(
    -                                   jobId,
    -                                   checkpointId,
    -                                   checkpointTimestamp,
    -                                   System.currentTimeMillis(),
    -                                   new HashMap<>(taskStates),
    -                                   props,
    -                                   externalPath);
    +                   // externalize the metadata
    +                   final Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
     
    -                   onCompletionPromise.complete(completed);
    +                   // TEMP FIX - The savepoint store is strictly typed to 
file systems currently
    +                   //            but the checkpoints think more generic. 
we need to work with file handles
    +                   //            here until the savepoint serializer 
accepts a generic stream factory
     
    -                   if (statsCallback != null) {
    -                           // Finalize the statsCallback and give the 
completed checkpoint a
    -                           // callback for discards.
    -                           CompletedCheckpointStats.DiscardCallback 
discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
    -                           completed.setDiscardCallback(discardCallback);
    -                   }
    +                   final FileStateHandle metadataHandle = 
SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
    +                   final String externalPointer = 
metadataHandle.getFilePath().getParent().toString();
     
    -                   dispose(false);
    +                   return finalizeInternal(metadataHandle, 
externalPointer);
    +           }
    +   }
    +
    +   public CompletedCheckpoint finalizeCheckpointNonExternalized() {
    +           synchronized (lock) {
    +                   checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
     
    -                   return completed;
    +                   // finalize without external metadata
    +                   return finalizeInternal(null, null);
                }
        }
     
    +   @GuardedBy("lock")
    +   private CompletedCheckpoint finalizeInternal(
    +                   @Nullable StreamStateHandle externalMetadata,
    +                   @Nullable String externalPointer) {
    +
    +           assert(Thread.holdsLock(lock));
    +
    +           CompletedCheckpoint completed = new CompletedCheckpoint(
    +                           jobId,
    +                           checkpointId,
    +                           checkpointTimestamp,
    +                           System.currentTimeMillis(),
    +                           new HashMap<>(taskStates),
    +                           props,
    +                           externalMetadata,
    +                           externalPointer);
    +
    +           onCompletionPromise.complete(completed);
    --- End diff --
    
    If the creation `CompletedCheckpoint` fails (for example because it the 
external metadata is null although the properties say the checkpoint should 
have been externalized), the promise is never completed. I think we should do a 
try catch and fail the promise in that case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to