[FLINK-5897] [checkpoints] Make checkpoint externalization not depend strictly 
on FileSystems

That is the first step towards checkpoints that can be externalized to other 
stores as well,
like k/v stores and databases, if supported by the state backend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b7f21d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b7f21d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b7f21d8

Branch: refs/heads/master
Commit: 5b7f21d891b410ca0046efdaf12caf5e73deadf4
Parents: 9912de2
Author: Stephan Ewen <[email protected]>
Authored: Wed Feb 22 22:18:50 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |   4 +-
 .../checkpoint/CheckpointCoordinator.java       |  20 ++-
 .../runtime/checkpoint/CompletedCheckpoint.java | 135 +++++++++++++++----
 .../checkpoint/CompletedCheckpointStore.java    |   9 ++
 .../runtime/checkpoint/PendingCheckpoint.java   | 112 ++++++++++-----
 .../StandaloneCompletedCheckpointStore.java     |   4 +
 .../ZooKeeperCompletedCheckpointStore.java      |   5 +
 .../checkpoint/savepoint/SavepointLoader.java   |  19 ++-
 .../checkpoint/savepoint/SavepointStore.java    |  93 ++++++++++---
 .../apache/flink/runtime/state/StateUtil.java   |  17 +--
 .../flink/runtime/jobmanager/JobManager.scala   |  14 +-
 .../CheckpointCoordinatorFailureTest.java       |   5 +
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/CompletedCheckpointTest.java     |  17 ++-
 .../checkpoint/PendingCheckpointTest.java       |  25 ++--
 .../jobmanager/JobManagerHARecoveryTest.java    |   4 +
 .../runtime/jobmanager/JobManagerITCase.scala   |   2 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   2 +-
 18 files changed, 365 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index fea25ff..7167a0b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -257,7 +257,7 @@ public final class ExceptionUtils {
                        throw (Error) t;
                }
                else {
-                       throw new IOException(t);
+                       throw new IOException(t.getMessage(), t);
                }
        }
 
@@ -268,7 +268,7 @@ public final class ExceptionUtils {
         * @param searchType the type of exception to search for in the chain.
         * @return True, if the searched type is nested in the throwable, false 
otherwise.
         */
-       public static boolean containsThrowable(Throwable throwable, Class 
searchType) {
+       public static boolean containsThrowable(Throwable throwable, Class<?> 
searchType) {
                if (throwable == null || searchType == null) {
                        return false;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c1c65b5..6da6f7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.TaskStateHandles;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -758,13 +759,19 @@ public class CheckpointCoordinator {
                CompletedCheckpoint completedCheckpoint = null;
 
                try {
-                       completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+                       // externalize the checkpoint if required
+                       if 
(pendingCheckpoint.getProps().externalizeCheckpoint()) {
+                               completedCheckpoint = 
pendingCheckpoint.finalizeCheckpointExternalized();
+                       } else {
+                               completedCheckpoint = 
pendingCheckpoint.finalizeCheckpointNonExternalized();
+                       }
 
                        
completedCheckpointStore.addCheckpoint(completedCheckpoint);
 
                        rememberRecentCheckpointId(checkpointId);
                        dropSubsumedCheckpoints(checkpointId);
-               } catch (Exception exception) {
+               }
+               catch (Exception exception) {
                        // abort the current pending checkpoint if it has not 
been discarded yet
                        if (!pendingCheckpoint.isDiscarded()) {
                                pendingCheckpoint.abortError(exception);
@@ -779,8 +786,8 @@ public class CheckpointCoordinator {
                                        public void run() {
                                                try {
                                                        cc.discard();
-                                               } catch (Exception 
nestedException) {
-                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), 
nestedException);
+                                               } catch (Throwable t) {
+                                                       LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), t);
                                                }
                                        }
                                });
@@ -808,11 +815,12 @@ public class CheckpointCoordinator {
                                builder.append(", ");
                        }
                        // Remove last two chars ", "
-                       builder.delete(builder.length() - 2, builder.length());
+                       builder.setLength(builder.length() - 2);
 
                        LOG.debug(builder.toString());
                }
 
+               // send the "notify complete" call to all vertices
                final long timestamp = completedCheckpoint.getTimestamp();
 
                for (ExecutionVertex ev : tasksToCommitTo) {
@@ -934,7 +942,7 @@ public class CheckpointCoordinator {
                                        latest.getCheckpointID(),
                                        latest.getProperties(),
                                        restoreTimestamp,
-                                       latest.getExternalPath());
+                                       latest.getExternalPointer());
 
                                statsTracker.reportRestoredCheckpoint(restored);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index db86484..17ce4d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStats.DiscardCallback;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +38,36 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
- * and that is considered completed.
+ * A CompletedCheckpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
+ * and that is considered successful. The CompletedCheckpoint class contains 
all the metadata of the
+ * checkpoint, i.e., checkpoint ID, timestamps, and the handles to all states 
that are part of the
+ * checkpoint.
+ * 
+ * <h2>Size the CompletedCheckpoint Instances</h2>
+ * 
+ * In most cases, the CompletedCheckpoint objects are very small, because the 
handles to the checkpoint
+ * states are only pointers (such as file paths). However, the some state 
backend implementations may
+ * choose to store some payload data directly with the metadata (for example 
to avoid many small files).
+ * If those thresholds are increased to large values, the memory consumption 
of the CompletedCheckpoint
+ * objects can be significant.
+ * 
+ * <h2>Externalized Metadata</h2>
+ * 
+ * The metadata of the CompletedCheckpoint is optionally also persisted in an 
external storage
+ * system. In that case, the checkpoint is called <i>externalized</i>.
+ * 
+ * <p>Externalized checkpoints have an external pointer, which points to the 
metadata. For example
+ * when externalizing to a file system, that pointer is the file path to the 
checkpoint's folder
+ * or the metadata file. For a state backend that stores metadata in database 
tables, the pointer
+ * could be the table name and row key. The pointer is encoded as a String.
+ * 
+ * <h2>Externalized Metadata and High-availability</h2>
+ * 
+ * For high availability setups, the checkpoint metadata must be stored 
persistent and available
+ * as well. The high-availability services that stores the checkpoint 
ground-truth (meaning what are
+ * the latest completed checkpoints in what order) often rely on checkpoints 
being externalized. That
+ * way, those services only store pointers to the externalized metadata, 
rather than the complete
+ * metadata itself (for example ZooKeeper's ZNode payload should ideally be 
less than megabytes).
  */
 public class CompletedCheckpoint implements Serializable {
 
@@ -44,8 +75,12 @@ public class CompletedCheckpoint implements Serializable {
 
        private static final long serialVersionUID = -8360248179615702014L;
 
+       // 
------------------------------------------------------------------------
+
+       /** The ID of the job that the checkpoint belongs to */
        private final JobID job;
 
+       /** The ID (logical timestamp) of the checkpoint */
        private final long checkpointID;
 
        /** The timestamp when the checkpoint was triggered. */
@@ -60,23 +95,41 @@ public class CompletedCheckpoint implements Serializable {
        /** Properties for this checkpoint. */
        private final CheckpointProperties props;
 
-       /** External path if persisted checkpoint; <code>null</code> otherwise. 
*/
-       private final String externalPath;
+       /** The state handle to the externalized meta data, if the metadata has 
been externalized */
+       @Nullable
+       private final StreamStateHandle externalizedMetadata;
+
+       /** External pointer to the completed checkpoint (for example file 
path) if externalized; null otherwise. */
+       @Nullable
+       private final String externalPointer;
 
        /** Optional stats tracker callback for discard. */
        @Nullable
-       private transient CompletedCheckpointStats.DiscardCallback 
discardCallback;
+       private transient volatile DiscardCallback discardCallback;
 
        // 
------------------------------------------------------------------------
 
-       public CompletedCheckpoint(
+       @VisibleForTesting
+       CompletedCheckpoint(
                        JobID job,
                        long checkpointID,
                        long timestamp,
                        long completionTimestamp,
                        Map<JobVertexID, TaskState> taskStates) {
 
-               this(job, checkpointID, timestamp, completionTimestamp, 
taskStates, CheckpointProperties.forStandardCheckpoint(), null);
+               this(job, checkpointID, timestamp, completionTimestamp, 
taskStates,
+                               CheckpointProperties.forStandardCheckpoint());
+       }
+
+       public CompletedCheckpoint(
+                       JobID job,
+                       long checkpointID,
+                       long timestamp,
+                       long completionTimestamp,
+                       Map<JobVertexID, TaskState> taskStates,
+                       CheckpointProperties props) {
+
+               this(job, checkpointID, timestamp, completionTimestamp, 
taskStates, props, null, null);
        }
 
        public CompletedCheckpoint(
@@ -86,24 +139,27 @@ public class CompletedCheckpoint implements Serializable {
                        long completionTimestamp,
                        Map<JobVertexID, TaskState> taskStates,
                        CheckpointProperties props,
-                       String externalPath) {
+                       @Nullable StreamStateHandle externalizedMetadata,
+                       @Nullable String externalPointer) {
 
                checkArgument(checkpointID >= 0);
                checkArgument(timestamp >= 0);
                checkArgument(completionTimestamp >= 0);
 
+               checkArgument((externalPointer == null) == 
(externalizedMetadata == null),
+                               "external pointer without externalized metadata 
must be both null or both non-null");
+
+               checkArgument(!props.externalizeCheckpoint() || externalPointer 
!= null, 
+                       "Checkpoint properties require externalized checkpoint, 
but checkpoint is not externalized");
+
                this.job = checkNotNull(job);
                this.checkpointID = checkpointID;
                this.timestamp = timestamp;
                this.duration = completionTimestamp - timestamp;
                this.taskStates = checkNotNull(taskStates);
                this.props = checkNotNull(props);
-               this.externalPath = externalPath;
-
-               if (props.externalizeCheckpoint() && externalPath == null) {
-                       throw new NullPointerException("Checkpoint properties 
say that the checkpoint " +
-                                       "should have been persisted, but 
missing external path.");
-               }
+               this.externalizedMetadata = externalizedMetadata;
+               this.externalPointer = externalPointer;
        }
 
        // 
------------------------------------------------------------------------
@@ -146,10 +202,9 @@ public class CompletedCheckpoint implements Serializable {
                        discard();
                        return true;
                } else {
-                       if (externalPath != null) {
+                       if (externalPointer != null) {
                                LOG.info("Persistent checkpoint with ID {} at 
'{}' not discarded.",
-                                               checkpointID,
-                                               externalPath);
+                                               checkpointID, externalPointer);
                        }
 
                        return false;
@@ -158,14 +213,36 @@ public class CompletedCheckpoint implements Serializable {
 
        void discard() throws Exception {
                try {
-                       if (externalPath != null) {
-                               
SavepointStore.removeSavepointFile(externalPath);
+                       // collect exceptions and continue cleanup
+                       Exception exception = null;
+
+                       // drop the metadata, if we have some
+                       if (externalizedMetadata != null) {
+                               try {
+                                       externalizedMetadata.discardState();
+                               }
+                               catch (Exception e) {
+                                       exception = e;
+                               }
                        }
 
-                       
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
-               } finally {
+                       // drop the actual state
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+                       }
+                       catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+
+                       if (exception != null) {
+                               throw exception;
+                       }
+               }
+               finally {
                        taskStates.clear();
 
+                       // to be null-pointer safe, copy reference to stack
+                       DiscardCallback discardCallback = this.discardCallback;
                        if (discardCallback != null) {
                                discardCallback.notifyDiscardedCheckpoint();
                        }
@@ -190,8 +267,18 @@ public class CompletedCheckpoint implements Serializable {
                return taskStates.get(jobVertexID);
        }
 
-       public String getExternalPath() {
-               return externalPath;
+       public boolean isExternalized() {
+               return externalizedMetadata != null;
+       }
+
+       @Nullable
+       public StreamStateHandle getExternalizedMetadata() {
+               return externalizedMetadata;
+       }
+
+       @Nullable
+       public String getExternalPointer() {
+               return externalPointer;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index d2c0f6c..e91e038 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -72,4 +72,13 @@ public interface CompletedCheckpointStore {
         */
        int getNumberOfRetainedCheckpoints();
 
+       /**
+        * This method returns whether the completed checkpoint store requires 
checkpoints to be
+        * externalized. Externalized checkpoints have their meta data 
persisted, which the checkpoint
+        * store can exploit (for example by simply pointing the persisted 
metadata).
+        * 
+        * @return True, if the store requires that checkpoints are 
externalized before being added, false
+        *         if the store stores the metadata itself.
+        */
+       boolean requiresExternalizedCheckpoints();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 908ff7f..2c392b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -28,6 +29,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
@@ -41,7 +44,10 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,7 +80,7 @@ public class PendingCheckpoint {
 
        /**
         * The checkpoint properties. If the checkpoint should be persisted
-        * externally, it happens in {@link #finalizeCheckpoint()}.
+        * externally, it happens in {@link #finalizeCheckpointExternalized()}.
         */
        private final CheckpointProperties props;
 
@@ -203,46 +209,80 @@ public class PendingCheckpoint {
                return onCompletionPromise;
        }
 
-       public CompletedCheckpoint finalizeCheckpoint() {
+       public CompletedCheckpoint finalizeCheckpointExternalized() throws 
IOException {
                synchronized (lock) {
-                       Preconditions.checkState(isFullyAcknowledged(), 
"Pending checkpoint has not been fully acknowledged yet.");
-
-                       // Persist if required
-                       String externalPath = null;
-                       if (props.externalizeCheckpoint()) {
-                               try {
-                                       Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
-                                       externalPath = 
SavepointStore.storeSavepoint(
-                                                       targetDirectory,
-                                                       savepoint
-                                       );
-                               } catch (IOException e) {
-                                       LOG.error("Failed to persist checkpoint 
{}.",checkpointId, e);
-                               }
-                       }
+                       checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
 
-                       CompletedCheckpoint completed = new CompletedCheckpoint(
-                                       jobId,
-                                       checkpointId,
-                                       checkpointTimestamp,
-                                       System.currentTimeMillis(),
-                                       new HashMap<>(taskStates),
-                                       props,
-                                       externalPath);
+                       // make sure we fulfill the promise with an exception 
if something fails
+                       try {
+                               // externalize the metadata
+                               final Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
 
-                       onCompletionPromise.complete(completed);
+                               // TEMP FIX - The savepoint store is strictly 
typed to file systems currently
+                               //            but the checkpoints think more 
generic. we need to work with file handles
+                               //            here until the savepoint 
serializer accepts a generic stream factory
 
-                       if (statsCallback != null) {
-                               // Finalize the statsCallback and give the 
completed checkpoint a
-                               // callback for discards.
-                               CompletedCheckpointStats.DiscardCallback 
discardCallback = statsCallback.reportCompletedCheckpoint(externalPath);
-                               completed.setDiscardCallback(discardCallback);
+                               final FileStateHandle metadataHandle = 
SavepointStore.storeSavepointToHandle(targetDirectory, savepoint);
+                               final String externalPointer = 
metadataHandle.getFilePath().getParent().toString();
+
+                               return finalizeInternal(metadataHandle, 
externalPointer);
+                       }
+                       catch (Throwable t) {
+                               onCompletionPromise.completeExceptionally(t);
+                               ExceptionUtils.rethrowIOException(t);
+                               return null; // silence the compiler
+                       }
+               }
+       }
+
+       public CompletedCheckpoint finalizeCheckpointNonExternalized() {
+               synchronized (lock) {
+                       checkState(isFullyAcknowledged(), "Pending checkpoint 
has not been fully acknowledged yet.");
+
+                       // make sure we fulfill the promise with an exception 
if something fails
+                       try {
+                               // finalize without external metadata
+                               return finalizeInternal(null, null);
                        }
+                       catch (Throwable t) {
+                               onCompletionPromise.completeExceptionally(t);
+                               ExceptionUtils.rethrow(t);
+                               return null; // silence the compiler
+                       }
+               }
+       }
 
-                       dispose(false);
+       @GuardedBy("lock")
+       private CompletedCheckpoint finalizeInternal(
+                       @Nullable StreamStateHandle externalMetadata,
+                       @Nullable String externalPointer) {
 
-                       return completed;
+               assert(Thread.holdsLock(lock));
+
+               CompletedCheckpoint completed = new CompletedCheckpoint(
+                               jobId,
+                               checkpointId,
+                               checkpointTimestamp,
+                               System.currentTimeMillis(),
+                               new HashMap<>(taskStates),
+                               props,
+                               externalMetadata,
+                               externalPointer);
+
+               onCompletionPromise.complete(completed);
+
+               if (statsCallback != null) {
+                       // Finalize the statsCallback and give the completed 
checkpoint a
+                       // callback for discards.
+                       CompletedCheckpointStats.DiscardCallback 
discardCallback = 
+                                       
statsCallback.reportCompletedCheckpoint(externalPointer);
+                       completed.setDiscardCallback(discardCallback);
                }
+
+               // mark this pending checkpoint as disposed, but do NOT drop 
the state
+               dispose(false);
+
+               return completed;
        }
 
        /**
@@ -411,9 +451,9 @@ public class PendingCheckpoint {
                                                public void run() {
                                                        try {
                                                                
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
-                                                       } catch (Exception e) {
-                                                               LOG.warn("Could 
not properly dispose the pending checkpoint " +
-                                                                       "{} of 
job {}.", checkpointId, jobId, e);
+                                                       } catch (Throwable t) {
+                                                               LOG.warn("Could 
not properly dispose the pending checkpoint {} of job {}.", 
+                                                                               
checkpointId, jobId, t);
                                                        } finally {
                                                                
taskStates.clear();
                                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 082bca9..a0248b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -96,4 +96,8 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
                }
        }
 
+       @Override
+       public boolean requiresExternalizedCheckpoints() {
+               return false;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index fdd0d40..4b03cea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -125,6 +125,11 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                LOG.info("Initialized in '{}'.", checkpointsPath);
        }
 
+       @Override
+       public boolean requiresExternalizedCheckpoints() {
+               return true;
+       }
+
        /**
         * Gets the latest checkpoint from ZooKeeper and removes all others.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 950a9a0..60f0287 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -19,11 +19,13 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,22 +48,27 @@ public class SavepointLoader {
         * @param jobId          The JobID of the job to load the savepoint for.
         * @param tasks          Tasks that will possibly be reset
         * @param savepointPath  The path of the savepoint to rollback to
-        * @param userClassLoader The user code classloader
+        * @param classLoader    The class loader to resolve serialized classes 
in legacy savepoint versions.
         * @param allowNonRestoredState Allow to skip checkpoint state that 
cannot be mapped
         * to any job vertex in tasks.
         *
         * @throws IllegalStateException If mismatch between program and 
savepoint state
-        * @throws Exception             If savepoint store failure
+        * @throws IOException             If savepoint store failure
         */
        public static CompletedCheckpoint loadAndValidateSavepoint(
                        JobID jobId,
                        Map<JobVertexID, ExecutionJobVertex> tasks,
                        String savepointPath,
-                       ClassLoader userClassLoader,
+                       ClassLoader classLoader,
                        boolean allowNonRestoredState) throws IOException {
 
                // (1) load the savepoint
-               Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath, userClassLoader);
+               final Tuple2<Savepoint, StreamStateHandle> savepointAndHandle = 
+                               
SavepointStore.loadSavepointWithHandle(savepointPath, classLoader);
+
+               final Savepoint savepoint = savepointAndHandle.f0;
+               final StreamStateHandle metadataHandle = savepointAndHandle.f1;
+
                final Map<JobVertexID, TaskState> taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
 
                boolean expandedToLegacyIds = false;
@@ -114,10 +121,12 @@ public class SavepointLoader {
 
                // (3) convert to checkpoint so the system can fall back to it
                CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
-               return new CompletedCheckpoint(jobId, 
savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath);
+               return new CompletedCheckpoint(jobId, 
savepoint.getCheckpointId(), 0L, 0L,
+                               taskStates, props, metadataHandle, 
savepointPath);
        }
 
        // 
------------------------------------------------------------------------
 
+       /** This class is not meant to be instantiated */
        private SavepointLoader() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 95370a5..5c8ac6b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -118,6 +121,28 @@ public class SavepointStore {
         * @throws IOException Failures during store are forwarded
         */
        public static <T extends Savepoint> String storeSavepoint(String 
directory, T savepoint) throws IOException {
+               // write and create the file handle
+               FileStateHandle metadataFileHandle = 
storeSavepointToHandle(directory, savepoint);
+
+               // we return the savepoint directory path here!
+               // The directory path also works to resume from and is more 
elegant than the direct
+               // metadata file pointer
+               return metadataFileHandle.getFilePath().getParent().toString();
+       }
+
+       /**
+        * Stores the savepoint metadata file to a state handle.
+        *
+        * @param directory Target directory to store savepoint in
+        * @param savepoint Savepoint to be stored
+        *                     
+        * @return State handle to the checkpoint metadata
+        * @throws IOException Failures during store are forwarded
+        */
+       public static <T extends Savepoint> FileStateHandle 
storeSavepointToHandle(
+                       String directory,
+                       T savepoint) throws IOException {
+
                checkNotNull(directory, "Target directory");
                checkNotNull(savepoint, "Savepoint");
 
@@ -127,10 +152,9 @@ public class SavepointStore {
                final FileSystem fs = FileSystem.get(basePath.toUri());
 
                boolean success = false;
-               try (FSDataOutputStream fdos = fs.create(metadataFilePath, 
WriteMode.NO_OVERWRITE); 
+               try (FSDataOutputStream fdos = fs.create(metadataFilePath, 
WriteMode.NO_OVERWRITE);
                                DataOutputStream dos = new 
DataOutputStream(fdos))
                {
-
                        // Write header
                        dos.writeInt(MAGIC_NUMBER);
                        dos.writeInt(savepoint.getVersion());
@@ -138,7 +162,13 @@ public class SavepointStore {
                        // Write savepoint
                        SavepointSerializer<T> serializer = 
SavepointSerializers.getSerializer(savepoint);
                        serializer.serialize(savepoint, dos);
+
+                       // construct result handle
+                       FileStateHandle handle = new 
FileStateHandle(metadataFilePath, dos.size());
+
+                       // all good!
                        success = true;
+                       return handle;
                }
                finally {
                        if (!success && fs.exists(metadataFilePath)) {
@@ -147,22 +177,37 @@ public class SavepointStore {
                                }
                        }
                }
-
-               // we return the savepoint directory path here!
-               // The directory path also works to resume from and is more 
elegant than the direct
-               // metadata file pointer
-               return basePath.toString();
        }
 
        /**
         * Loads the savepoint at the specified path.
         *
         * @param savepointFileOrDirectory Path to the parent savepoint 
directory or the meta data file.
+        * @param classLoader The class loader used to resolve serialized 
classes from legacy savepoint formats.
         * @return The loaded savepoint
+        * 
         * @throws IOException Failures during load are forwarded
         */
-       public static Savepoint loadSavepoint(String savepointFileOrDirectory, 
ClassLoader userClassLoader) throws IOException {
-               Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+       public static Savepoint loadSavepoint(String savepointFileOrDirectory, 
ClassLoader classLoader) throws IOException {
+               return loadSavepointWithHandle(savepointFileOrDirectory, 
classLoader).f0;
+       }
+
+       /**
+        * Loads the savepoint at the specified path. This methods returns the 
savepoint, as well as the
+        * handle to the metadata.
+        *
+        * @param savepointFileOrDirectory Path to the parent savepoint 
directory or the meta data file.
+        * @param classLoader The class loader used to resolve serialized 
classes from legacy savepoint formats.
+        * @return The loaded savepoint
+        *
+        * @throws IOException Failures during load are forwarded
+        */
+       public static Tuple2<Savepoint, StreamStateHandle> 
loadSavepointWithHandle(
+                       String savepointFileOrDirectory,
+                       ClassLoader classLoader) throws IOException {
+               
+               checkNotNull(savepointFileOrDirectory, 
"savepointFileOrDirectory");
+               checkNotNull(classLoader, "classLoader");
 
                Path path = new Path(savepointFileOrDirectory);
 
@@ -180,11 +225,13 @@ public class SavepointStore {
                                LOG.info("Using savepoint file in {}", path);
                        } else {
                                throw new IOException("Cannot find meta data 
file in directory " + path
-                                       + ". Please try to load the savepoint 
directly from the meta data file "
-                                       + "instead of the directory.");
+                                               + ". Please try to load the 
savepoint directly from the meta data file "
+                                               + "instead of the directory.");
                        }
                }
 
+               // load the savepoint
+               final Savepoint savepoint;
                try (DataInputStream dis = new 
DataInputViewStreamWrapper(fs.open(path))) {
                        int magicNumber = dis.readInt();
 
@@ -192,15 +239,27 @@ public class SavepointStore {
                                int version = dis.readInt();
 
                                SavepointSerializer<?> serializer = 
SavepointSerializers.getSerializer(version);
-                               return serializer.deserialize(dis, 
userClassLoader);
+                               savepoint = serializer.deserialize(dis, 
classLoader);
                        } else {
-                               throw new RuntimeException("Unexpected magic 
number. This is most likely " +
-                                               "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
-                                               "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
-                                               "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
-                                               "file is not a proper savepoint 
or the file has been corrupted.");
+                               throw new RuntimeException("Unexpected magic 
number. This can have multiple reasons: " +
+                                               "(1) You are trying to load a 
Flink 1.0 savepoint, which is not supported by this " +
+                                               "version of Flink. (2) The file 
you were pointing to is not a savepoint at all. " +
+                                               "(3) The savepoint file has 
been corrupted.");
                        }
                }
+
+               // construct the stream handle to the metadata file
+               // we get the size best-effort
+               long size = 0;
+               try {
+                       size = fs.getFileStatus(path).getLen();
+               }
+               catch (Exception ignored) {
+                       // we don't know the size, but we don't want to fail 
the savepoint loading for that
+               }
+               StreamStateHandle metadataHandle = new FileStateHandle(path, 
size);
+
+               return new Tuple2<>(savepoint, metadataHandle);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index c6f5c86..b250831 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FutureUtil;
 
 import java.util.concurrent.RunnableFuture;
@@ -42,26 +43,22 @@ public class StateUtil {
                        Iterable<? extends StateObject> handlesToDiscard) 
throws Exception {
 
                if (handlesToDiscard != null) {
-
-                       Exception suppressedExceptions = null;
+                       Exception exception = null;
 
                        for (StateObject state : handlesToDiscard) {
 
                                if (state != null) {
                                        try {
                                                state.discardState();
-                                       } catch (Exception ex) {
-                                               //best effort to still cleanup 
other states and deliver exceptions in the end
-                                               if (suppressedExceptions == 
null) {
-                                                       suppressedExceptions = 
new Exception(ex);
-                                               }
-                                               
suppressedExceptions.addSuppressed(ex);
+                                       }
+                                       catch (Exception ex) {
+                                               exception = 
ExceptionUtils.firstOrSuppressed(ex, exception);
                                        }
                                }
                        }
 
-                       if (suppressedExceptions != null) {
-                               throw suppressedExceptions;
+                       if (exception != null) {
+                               throw exception;
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 21749cb..87cd4ac 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import java.io.{File, IOException}
+import java.io.IOException
 import java.net._
 import java.util.UUID
 import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
@@ -50,7 +50,7 @@ import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, 
InstanceManager}
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
@@ -77,7 +77,7 @@ import 
org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, JobException, 
LeaderSessionMessageFilter, LogMessages}
+import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
 import org.jboss.netty.channel.ChannelException
 
@@ -611,7 +611,7 @@ class JobManager(
                 new BiFunction[CompletedCheckpoint, Throwable, Void] {
                   override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
                     if (success != null) {
-                      val path = success.getExternalPath()
+                      val path = success.getExternalPointer()
                       log.info(s"Savepoint stored in $path. Now cancelling 
$jobId.")
                       executionGraph.cancel()
                       senderRef ! decorateMessage(CancellationSuccess(jobId, 
path))
@@ -787,11 +787,11 @@ class JobManager(
                 new BiFunction[CompletedCheckpoint, Throwable, Void] {
                   override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
                     if (success != null) {
-                      if (success.getExternalPath != null) {
+                      if (success.getExternalPointer != null) {
                         senderRef ! TriggerSavepointSuccess(
                           jobId,
                           success.getCheckpointID,
-                          success.getExternalPath,
+                          success.getExternalPointer,
                           success.getTimestamp
                         )
                       } else {
@@ -1784,7 +1784,7 @@ class JobManager(
       case t: Throwable =>
         log.error(s"Could not properly unregister job $jobID form the library 
cache.", t)
     }
-    jobManagerMetricGroup.map(_.removeJob(jobID))
+    jobManagerMetricGroup.foreach(_.removeJob(jobID))
 
     futureOption
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d4c3a2d..9517257 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -134,5 +134,10 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                public int getNumberOfRetainedCheckpoints() {
                        return -1;
                }
+
+               @Override
+               public boolean requiresExternalizedCheckpoints() {
+                       return false;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 725b85f..f77c755 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -237,7 +237,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        Map<JobVertexID, TaskState> taskGroupStates,
                        CheckpointProperties props) {
 
-                       super(jobId, checkpointId, timestamp, Long.MAX_VALUE, 
taskGroupStates, props, null);
+                       super(jobId, checkpointId, timestamp, Long.MAX_VALUE, 
taskGroupStates, props);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0d933ff..b34e9a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,9 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -55,7 +57,9 @@ public class CompletedCheckpointTest {
 
                // Verify discard call is forwarded to state
                CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-                               new JobID(), 0, 0, 1, taskStates, 
CheckpointProperties.forStandardCheckpoint(), file.getAbsolutePath());
+                               new JobID(), 0, 0, 1, taskStates, 
CheckpointProperties.forStandardCheckpoint(),
+                               new FileStateHandle(new Path(file.toURI()), 
file.length()),
+                               file.getAbsolutePath());
 
                checkpoint.discard(JobStatus.FAILED);
 
@@ -74,7 +78,7 @@ public class CompletedCheckpointTest {
                boolean discardSubsumed = true;
                CheckpointProperties props = new CheckpointProperties(false, 
false, discardSubsumed, true, true, true, true);
                CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-                               new JobID(), 0, 0, 1, taskStates, props, null);
+                               new JobID(), 0, 0, 1, taskStates, props);
 
                // Subsume
                checkpoint.subsume();
@@ -104,7 +108,9 @@ public class CompletedCheckpointTest {
                        // Keep
                        CheckpointProperties props = new 
CheckpointProperties(false, true, false, false, false, false, false);
                        CompletedCheckpoint checkpoint = new 
CompletedCheckpoint(
-                                       new JobID(), 0, 0, 1, new 
HashMap<>(taskStates), props, externalPath);
+                                       new JobID(), 0, 0, 1, new 
HashMap<>(taskStates), props,
+                                       new FileStateHandle(new 
Path(file.toURI()), file.length()),
+                                       externalPath);
 
                        checkpoint.discard(status);
                        verify(state, times(0)).discardState();
@@ -113,7 +119,7 @@ public class CompletedCheckpointTest {
                        // Discard
                        props = new CheckpointProperties(false, false, true, 
true, true, true, true);
                        checkpoint = new CompletedCheckpoint(
-                                       new JobID(), 0, 0, 1, new 
HashMap<>(taskStates), props, null);
+                                       new JobID(), 0, 0, 1, new 
HashMap<>(taskStates), props);
 
                        checkpoint.discard(status);
                        verify(state, times(1)).discardState();
@@ -135,8 +141,7 @@ public class CompletedCheckpointTest {
                        0,
                        1,
                        new HashMap<>(taskStates),
-                       CheckpointProperties.forStandardCheckpoint(),
-                       null);
+                       CheckpointProperties.forStandardCheckpoint());
 
                CompletedCheckpointStats.DiscardCallback callback = 
mock(CompletedCheckpointStats.DiscardCallback.class);
                completed.setDiscardCallback(callback);

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 3a85c4c..6f04f39 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -49,9 +51,6 @@ import static org.mockito.Mockito.verify;
 
 public class PendingCheckpointTest {
 
-       @Rule
-       public TemporaryFolder tmpFolder = new TemporaryFolder();
-
        private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS 
= new HashMap<>();
        private static final ExecutionAttemptID ATTEMPT_ID = new 
ExecutionAttemptID();
 
@@ -59,6 +58,9 @@ public class PendingCheckpointTest {
                ACK_TASKS.put(ATTEMPT_ID, mock(ExecutionVertex.class));
        }
 
+       @Rule
+       public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
        /**
         * Tests that pending checkpoints can be subsumed iff they are forced.
         */
@@ -96,7 +98,7 @@ public class PendingCheckpointTest {
                PendingCheckpoint pending = createPendingCheckpoint(persisted, 
tmp.getAbsolutePath());
                pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
                assertEquals(0, tmp.listFiles().length);
-               pending.finalizeCheckpoint();
+               pending.finalizeCheckpointExternalized();
                assertEquals(1, tmp.listFiles().length);
 
                // Ephemeral checkpoint
@@ -105,7 +107,7 @@ public class PendingCheckpointTest {
                pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
 
                assertEquals(1, tmp.listFiles().length);
-               pending.finalizeCheckpoint();
+               pending.finalizeCheckpointNonExternalized();
                assertEquals(1, tmp.listFiles().length);
        }
 
@@ -148,7 +150,8 @@ public class PendingCheckpointTest {
 
                assertFalse(future.isDone());
                pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
-               pending.finalizeCheckpoint();
+               assertTrue(pending.isFullyAcknowledged());
+               pending.finalizeCheckpointExternalized();
                assertTrue(future.isDone());
 
                // Finalize (missing ACKs)
@@ -157,7 +160,13 @@ public class PendingCheckpointTest {
 
                assertFalse(future.isDone());
                try {
-                       pending.finalizeCheckpoint();
+                       pending.finalizeCheckpointNonExternalized();
+                       fail("Did not throw expected Exception");
+               } catch (IllegalStateException ignored) {
+                       // Expected
+               }
+               try {
+                       pending.finalizeCheckpointExternalized();
                        fail("Did not throw expected Exception");
                } catch (IllegalStateException ignored) {
                        // Expected
@@ -233,7 +242,7 @@ public class PendingCheckpointTest {
                        pending.acknowledgeTask(ATTEMPT_ID, null, new 
CheckpointMetrics());
                        verify(callback, 
times(1)).reportSubtaskStats(any(JobVertexID.class), 
any(SubtaskStateStats.class));
 
-                       pending.finalizeCheckpoint();
+                       pending.finalizeCheckpointNonExternalized();
                        verify(callback, 
times(1)).reportCompletedCheckpoint(any(String.class));
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 5a38be2..cbb077c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -483,6 +483,10 @@ public class JobManagerHARecoveryTest {
                        return checkpoints.size();
                }
 
+               @Override
+               public boolean requiresExternalizedCheckpoints() {
+                       return false;
+               }
        }
 
        static class MyCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 60b12d2..75f1fd4 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -979,7 +979,7 @@ class JobManagerITCase(_system: ActorSystem)
           jobManager.tell(TriggerSavepoint(jobGraph.getJobID(), 
Option.apply("any")), testActor)
 
           val checkpoint = Mockito.mock(classOf[CompletedCheckpoint])
-          when(checkpoint.getExternalPath).thenReturn("Expected test savepoint 
path")
+          when(checkpoint.getExternalPointer).thenReturn("Expected test 
savepoint path")
 
           // Succeed the promise
           savepointPromise.complete(checkpoint)

http://git-wip-us.apache.org/repos/asf/flink/blob/5b7f21d8/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 60a3a62..f910e49 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -167,7 +167,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Parallelism);
 
                ActorSystem testSystem = null;
-               JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
+               final JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
                LeaderRetrievalService leaderRetrievalService = null;
                ActorSystem taskManagerSystem = null;
 

Reply via email to