[FLINK-6390] [checkpoints] Add API for checkpoints that are triggered via 
external systems

This includes
  - A interface for hooks that are called by the checkpoint coordinator to 
trigger/restore a checkpoint
  - A source extension that triggers the operator checkpoints and barrier 
injection on certain events

Because this changes the checkpoint metadata format, the commit introduces a 
new metadata format version.

This closes #3782


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90ca4381
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90ca4381
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90ca4381

Branch: refs/heads/master
Commit: 90ca438106e63c5032ee2ad27e54e9f573eac386
Parents: 6bdaf1e
Author: Stephan Ewen <[email protected]>
Authored: Mon Mar 27 17:20:47 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Apr 26 21:56:45 2017 +0200

----------------------------------------------------------------------
 .../core/io/SimpleVersionedSerializer.java      |  80 ++++
 .../java/org/apache/flink/util/StringUtils.java |   7 +
 .../checkpoint/savepoint/SavepointV0.java       |  10 +
 .../savepoint/SavepointV0Serializer.java        |  12 +-
 .../checkpoint/CheckpointCoordinator.java       |  80 +++-
 .../checkpoint/CheckpointDeclineReason.java     |   4 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |  31 +-
 .../flink/runtime/checkpoint/MasterState.java   |  62 +++
 .../checkpoint/MasterTriggerRestoreHook.java    | 140 ++++++
 .../runtime/checkpoint/PendingCheckpoint.java   |  23 +-
 .../runtime/checkpoint/hooks/MasterHooks.java   | 273 +++++++++++
 .../runtime/checkpoint/savepoint/Savepoint.java |   6 +
 .../checkpoint/savepoint/SavepointLoader.java   |   3 +-
 .../savepoint/SavepointSerializers.java         |   7 +-
 .../checkpoint/savepoint/SavepointV1.java       |  36 +-
 .../savepoint/SavepointV1Serializer.java        |  76 +--
 .../checkpoint/savepoint/SavepointV2.java       |  91 ++++
 .../savepoint/SavepointV2Serializer.java        | 468 +++++++++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |   9 +
 .../executiongraph/ExecutionGraphBuilder.java   |  18 +
 .../tasks/JobCheckpointingSettings.java         |  48 +-
 .../CheckpointCoordinatorMasterHooksTest.java   | 421 +++++++++++++++++
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/CompletedCheckpointTest.java     |  24 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   1 +
 .../savepoint/CheckpointTestUtils.java          | 184 ++++++++
 .../savepoint/SavepointLoaderTest.java          |   2 +-
 .../savepoint/SavepointStoreTest.java           |  27 +-
 .../savepoint/SavepointV1SerializerTest.java    |  17 +-
 .../checkpoint/savepoint/SavepointV1Test.java   | 157 -------
 .../savepoint/SavepointV2SerializerTest.java    | 148 ++++++
 .../checkpoint/savepoint/SavepointV2Test.java   |  68 +++
 .../ArchivedExecutionGraphTest.java             |   2 +
 .../api/checkpoint/ExternallyInducedSource.java |  75 +++
 .../checkpoint/WithMasterCheckpointHook.java    |  38 ++
 .../FunctionMasterCheckpointHookFactory.java    |  45 ++
 .../api/graph/StreamingJobGraphGenerator.java   |  28 +-
 .../runtime/tasks/SourceStreamTask.java         |  56 +++
 .../WithMasterCheckpointHookConfigTest.java     | 189 ++++++++
 .../runtime/io/StreamRecordWriterTest.java      |   5 -
 .../SourceExternalCheckpointTriggerTest.java    | 171 +++++++
 .../runtime/tasks/StreamTaskTestHarness.java    |   7 +-
 .../test/checkpointing/SavepointITCase.java     |   4 +-
 43 files changed, 2874 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
new file mode 100644
index 0000000..6c061a5
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * A simple serializer interface for versioned serialization.
+ * 
+ * <p>The serializer has a version (returned by {@link #getVersion()}) which 
can be attached
+ * to the serialized data. When the serializer evolves, the version can be 
used to identify
+ * with which prior version the data was serialized.
+ * 
+ * <pre>{@code
+ * MyType someObject = ...;
+ * SimpleVersionedSerializer<MyType> serializer = ...;
+ *
+ * byte[] serializedData = serializer.serialize(someObject);
+ * int version = serializer.getVersion();
+ *
+ * MyType deserialized = serializer.deserialize(version, serializedData);
+ * 
+ * byte[] someOldData = ...;
+ * int oldVersion = ...;
+ * MyType deserializedOldObject = serializer.deserialize(oldVersion, 
someOldData);
+ * 
+ * }</pre>
+ * 
+ * @param <E> The data type serialized / deserialized by this serializer.
+ */
+public interface SimpleVersionedSerializer<E> extends Versioned {
+
+       /**
+        * Gets the version with which this serializer serializes.
+        * 
+        * @return The version of the serialization schema.
+        */
+       @Override
+       int getVersion();
+
+       /**
+        * Serializes the given object. The serialization is assumed to 
correspond to the
+        * current serialization version (as returned by {@link #getVersion()}.
+        *
+        * 
+        * @param obj The object to serialize.
+        * @return The serialized data (bytes).
+        * 
+        * @throws IOException Thrown, if the serialization fails.
+        */
+       byte[] serialize(E obj) throws IOException;
+
+       /**
+        * De-serializes the given data (bytes) which was serialized with the 
scheme of the
+        * indicated version.
+        * 
+        * @param version The version in which the data was serialized
+        * @param serialized The serialized data
+        * @return The deserialized object
+        * 
+        * @throws IOException Thrown, if the deserialization fails.
+        */
+       E deserialize(int version, byte[] serialized) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index b84f602..abd6ba6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -309,6 +309,13 @@ public final class StringUtils {
                }
        }
 
+       /**
+        * Checks if the string is null, empty, or contains only whitespace 
characters.
+        * A whitespace character is defined via {@link 
Character#isWhitespace(char)}.
+        * 
+        * @param str The string to check
+        * @return True, if the string is null or blank, false otherwise.
+        */
        public static boolean isNullOrWhitespaceOnly(String str) {
                if (str == null || str.length() == 0) {
                        return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
index 1c51a69..f3ec1cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0.java
@@ -19,6 +19,7 @@
 package org.apache.flink.migration.runtime.checkpoint.savepoint;
 
 import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.util.Preconditions;
 
@@ -58,6 +59,15 @@ public class SavepointV0 implements Savepoint {
 
        @Override
        public Collection<org.apache.flink.runtime.checkpoint.TaskState> 
getTaskStates() {
+               // since checkpoints are never deserialized into this format,
+               // this method should never be called
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public Collection<MasterState> getMasterStates() {
+               // since checkpoints are never deserialized into this format,
+               // this method should never be called
                throw new UnsupportedOperationException();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 4739033..d285906 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
 import org.apache.flink.migration.util.SerializedValue;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -68,7 +68,7 @@ import java.util.Map;
  * don't rely on any involved Java classes to stay the same.
  */
 @SuppressWarnings("deprecation")
-public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> 
{
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV2> 
{
 
        public static final SavepointV0Serializer INSTANCE = new 
SavepointV0Serializer();
        private static final StreamStateHandle SIGNAL_0 = new 
ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
@@ -81,12 +81,12 @@ public class SavepointV0Serializer implements 
SavepointSerializer<SavepointV1> {
 
 
        @Override
-       public void serialize(SavepointV1 savepoint, DataOutputStream dos) 
throws IOException {
+       public void serialize(SavepointV2 savepoint, DataOutputStream dos) 
throws IOException {
                throw new UnsupportedOperationException("This serializer is 
read-only and only exists for backwards compatibility");
        }
 
        @Override
-       public SavepointV1 deserialize(DataInputStream dis, ClassLoader 
userClassLoader) throws IOException {
+       public SavepointV2 deserialize(DataInputStream dis, ClassLoader 
userClassLoader) throws IOException {
 
                long checkpointId = dis.readLong();
 
@@ -165,7 +165,7 @@ public class SavepointV0Serializer implements 
SavepointSerializer<SavepointV1> {
                return serializedValue;
        }
 
-       private SavepointV1 convertSavepoint(
+       private SavepointV2 convertSavepoint(
                        List<TaskState> taskStates,
                        ClassLoader userClassLoader,
                        long checkpointID) throws Exception {
@@ -176,7 +176,7 @@ public class SavepointV0Serializer implements 
SavepointSerializer<SavepointV1> {
                        newTaskStates.add(convertTaskState(taskState, 
userClassLoader, checkpointID));
                }
 
-               return new SavepointV1(checkpointID, newTaskStates);
+               return new SavepointV2(checkpointID, newTaskStates);
        }
 
        private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 256321e..23a38d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
@@ -39,7 +41,10 @@ import 
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +143,9 @@ public class CheckpointCoordinator {
        /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints */
        private final ScheduledThreadPoolExecutor timer;
 
+       /** The master checkpoint hooks executed by this checkpoint coordinator 
*/
+       private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
+
        /** Actor that receives status updates from the execution graph this 
coordinator works for */
        private JobStatusListener jobStatusListener;
 
@@ -220,6 +228,7 @@ public class CheckpointCoordinator {
                this.executor = checkNotNull(executor);
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
+               this.masterHooks = new HashMap<>();
 
                this.timer = new ScheduledThreadPoolExecutor(1,
                                new 
DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint 
Timer"));
@@ -245,6 +254,45 @@ public class CheckpointCoordinator {
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Configuration
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Adds the given master hook to the checkpoint coordinator. This 
method does nothing, if
+        * the checkpoint coordinator already contained a hook with the same ID 
(as defined via
+        * {@link MasterTriggerRestoreHook#getIdentifier()}).
+        * 
+        * @param hook The hook to add.
+        * @return True, if the hook was added, false if the checkpoint 
coordinator already
+        *         contained a hook with the same ID.
+        */
+       public boolean addMasterHook(MasterTriggerRestoreHook<?> hook) {
+               checkNotNull(hook);
+
+               final String id = hook.getIdentifier();
+               checkArgument(!StringUtils.isNullOrWhitespaceOnly(id), "The 
hook has a null or empty id");
+
+               synchronized (lock) {
+                       if (!masterHooks.containsKey(id)) {
+                               masterHooks.put(id, hook);
+                               return true;
+                       }
+                       else {
+                               return false;
+                       }
+               }
+       }
+
+       /**
+        * Gets the number of currently register master hooks.
+        */
+       public int getNumberOfRegisteredMasterHooks() {
+               synchronized (lock) {
+                       return masterHooks.size();
+               }
+       }
+
        /**
         * Sets the checkpoint stats tracker.
         *
@@ -492,6 +540,20 @@ public class CheckpointCoordinator {
                                checkpoint.setStatsCallback(callback);
                        }
 
+                       // trigger the master hooks for the checkpoint
+                       try {
+                               List<MasterState> masterStates = 
MasterHooks.triggerMasterHooks(masterHooks.values(),
+                                               checkpointID, timestamp, 
executor, Time.milliseconds(checkpointTimeout));
+
+                               for (MasterState s : masterStates) {
+                                       checkpoint.addMasterState(s);
+                               }
+                       }
+                       catch (FlinkException e) {
+                               checkpoint.abortError(e);
+                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+                       }
+
                        // schedule the timer that will clean up the expired 
checkpoints
                        final Runnable canceller = new Runnable() {
                                @Override
@@ -962,13 +1024,25 @@ public class CheckpointCoordinator {
 
                        LOG.info("Restoring from latest valid checkpoint: {}.", 
latest);
 
+                       // re-assign the task states
+
                        final Map<JobVertexID, TaskState> taskStates = 
latest.getTaskStates();
 
                        StateAssignmentOperation stateAssignmentOperation =
                                        new StateAssignmentOperation(LOG, 
tasks, taskStates, allowNonRestoredState);
-
                        stateAssignmentOperation.assignStates();
 
+                       // call master hooks for restore
+
+                       MasterHooks.restoreMasterHooks(
+                                       masterHooks,
+                                       latest.getMasterHookStates(),
+                                       latest.getCheckpointID(),
+                                       allowNonRestoredState,
+                                       LOG);
+
+                       // update metrics
+
                        if (statsTracker != null) {
                                long restoreTimestamp = 
System.currentTimeMillis();
                                RestoredCheckpointStats restored = new 
RestoredCheckpointStats(
@@ -1022,9 +1096,9 @@ public class CheckpointCoordinator {
                return restoreLatestCheckpointedState(tasks, true, 
allowNonRestored);
        }
 
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
        //  Accessors
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
 
        public int getNumberOfPendingCheckpoints() {
                return this.pendingCheckpoints.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 60fe657..41c50cc0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -36,7 +36,9 @@ public enum CheckpointDeclineReason {
 
        NOT_ALL_REQUIRED_TASKS_RUNNING("Not all required tasks are currently 
running."),
 
-       EXCEPTION("An Exception occurred while triggering the checkpoint.");
+       EXCEPTION("An Exception occurred while triggering the checkpoint."),
+
+       EXPIRED("The checkpoint expired before triggering was complete");
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 79fc31f..bb49b45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -27,11 +27,16 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -90,11 +95,14 @@ public class CompletedCheckpoint implements Serializable {
        private final long duration;
 
        /** States of the different task groups belonging to this checkpoint */
-       private final Map<JobVertexID, TaskState> taskStates;
+       private final HashMap<JobVertexID, TaskState> taskStates;
 
        /** Properties for this checkpoint. */
        private final CheckpointProperties props;
 
+       /** States that were created by a hook on the master (in the checkpoint 
coordinator) */
+       private final Collection<MasterState> masterHookStates;
+
        /** The state handle to the externalized meta data, if the metadata has 
been externalized */
        @Nullable
        private final StreamStateHandle externalizedMetadata;
@@ -118,6 +126,7 @@ public class CompletedCheckpoint implements Serializable {
                        Map<JobVertexID, TaskState> taskStates) {
 
                this(job, checkpointID, timestamp, completionTimestamp, 
taskStates,
+                               Collections.<MasterState>emptyList(),
                                CheckpointProperties.forStandardCheckpoint());
        }
 
@@ -127,9 +136,11 @@ public class CompletedCheckpoint implements Serializable {
                        long timestamp,
                        long completionTimestamp,
                        Map<JobVertexID, TaskState> taskStates,
+                       @Nullable Collection<MasterState> masterHookStates,
                        CheckpointProperties props) {
 
-               this(job, checkpointID, timestamp, completionTimestamp, 
taskStates, props, null, null);
+               this(job, checkpointID, timestamp, completionTimestamp, 
taskStates, 
+                               masterHookStates, props, null, null);
        }
 
        public CompletedCheckpoint(
@@ -138,6 +149,7 @@ public class CompletedCheckpoint implements Serializable {
                        long timestamp,
                        long completionTimestamp,
                        Map<JobVertexID, TaskState> taskStates,
+                       @Nullable Collection<MasterState> masterHookStates,
                        CheckpointProperties props,
                        @Nullable StreamStateHandle externalizedMetadata,
                        @Nullable String externalPointer) {
@@ -156,7 +168,14 @@ public class CompletedCheckpoint implements Serializable {
                this.checkpointID = checkpointID;
                this.timestamp = timestamp;
                this.duration = completionTimestamp - timestamp;
-               this.taskStates = checkNotNull(taskStates);
+
+               // we create copies here, to make sure we have no shared mutable
+               // data structure with the "outside world"
+               this.taskStates = new HashMap<>(checkNotNull(taskStates));
+               this.masterHookStates = masterHookStates == null || 
masterHookStates.isEmpty() ?
+                               Collections.<MasterState>emptyList() :
+                               new ArrayList<>(masterHookStates);
+
                this.props = checkNotNull(props);
                this.externalizedMetadata = externalizedMetadata;
                this.externalPointer = externalPointer;
@@ -228,13 +247,17 @@ public class CompletedCheckpoint implements Serializable {
        }
 
        public Map<JobVertexID, TaskState> getTaskStates() {
-               return taskStates;
+               return Collections.unmodifiableMap(taskStates);
        }
 
        public TaskState getTaskState(JobVertexID jobVertexID) {
                return taskStates.get(jobVertexID);
        }
 
+       public Collection<MasterState> getMasterHookStates() {
+               return Collections.unmodifiableCollection(masterHookStates);
+       }
+
        public boolean isExternalized() {
                return externalizedMetadata != null;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
new file mode 100644
index 0000000..2d09fdb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterState.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple encapsulation of state generated by checkpoint coordinator.
+ */
+public class MasterState implements java.io.Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       private final String name;
+       private final byte[] bytes;
+       private final int version;
+
+       public MasterState(String name, byte[] bytes, int version) {
+               this.name = checkNotNull(name);
+               this.bytes = checkNotNull(bytes);
+               this.version = version;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public String name() {
+               return name;
+       }
+
+       public byte[] bytes() {
+               return bytes;
+       }
+
+       public int version() {
+               return version;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "name: " + name + " ; version: " + version + " ; bytes: 
" + Arrays.toString(bytes);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
new file mode 100644
index 0000000..e77ed57
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.concurrent.Future;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.Executor;
+
+/**
+ * The interface for hooks that can be called by the checkpoint coordinator 
when triggering or
+ * restoring a checkpoint. Such a hook is useful for example when preparing 
external systems for
+ * taking or restoring checkpoints.
+ * 
+ * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called when 
triggering a checkpoint)
+ * can return a result (via a future) that will be stored as part of the 
checkpoint metadata.
+ * When restoring a checkpoint, that stored result will be given to the {@link 
#restoreCheckpoint(long, Object)}
+ * method. The hook's {@link #getIdentifier() identifier} is used to map data 
to hook in the presence
+ * of multiple hooks, and when resuming a savepoint that was potentially 
created by a different job.
+ * The identifier has a similar role as for example the operator UID in the 
streaming API.
+ * 
+ * <p>The MasterTriggerRestoreHook is defined when creating the streaming 
dataflow graph. It is attached
+ * to the job graph, which gets sent to the cluster for execution. To avoid 
having to make the hook
+ * itself serializable, these hooks are attached to the job graph via a {@link 
MasterTriggerRestoreHook.Factory}.
+ * 
+ * @param <T> The type of the data produced by the hook and stored as part of 
the checkpoint metadata.
+ *            If the hook never stores any data, this can be typed to {@code 
Void}.
+ */
+public interface MasterTriggerRestoreHook<T> {
+
+       /**
+        * Gets the identifier of this hook. The identifier is used to identify 
a specific hook in the
+        * presence of multiple hooks and to give it the correct checkpointed 
data upon checkpoint restoration.
+        * 
+        * <p>The identifier should be unique between different hooks of a job, 
but deterministic/constant
+        * so that upon resuming a savepoint, the hook will get the correct 
data.
+        * For example, if the hook calls into another storage system and 
persists namespace/schema specific
+        * information, then the name of the storage system, together with the 
namespace/schema name could
+        * be an appropriate identifier.
+        * 
+        * <p>When multiple hooks of the same name are created and attached to 
a job graph, only the first
+        * one is actually used. This can be exploited to deduplicate hooks 
that would do the same thing.
+        * 
+        * @return The identifier of the hook. 
+        */
+       String getIdentifier();
+
+       /**
+        * This method is called by the checkpoint coordinator prior when 
triggering a checkpoint, prior
+        * to sending the "trigger checkpoint" messages to the source tasks.
+        * 
+        * <p>If the hook implementation wants to store data as part of the 
checkpoint, it may return
+        * that data via a future, otherwise it should return null. The data is 
stored as part of
+        * the checkpoint metadata under the hooks identifier (see {@link 
#getIdentifier()}).
+        * 
+        * <p>If the action by this hook needs to be executed synchronously, 
then this method should
+        * directly execute the action synchronously and block until it is 
complete. The returned future
+        * (if any) would typically be a completed future.
+        * 
+        * <p>If the action should be executed asynchronously and only needs to 
complete before the
+        * checkpoint is considered completed, then the method may use the 
given executor to execute the
+        * actual action and would signal its completion by completing the 
future. For hooks that do not
+        * need to store data, the future would be completed with null.
+        * 
+        * @param checkpointId The ID (logical timestamp, monotonously 
increasing) of the checkpoint
+        * @param timestamp The wall clock timestamp when the checkpoint was 
triggered, for
+        *                  info/logging purposes. 
+        * @param executor The executor for asynchronous actions
+        * 
+        * @return Optionally, a future that signals when the hook has 
completed and that contains
+        *         data to be stored with the checkpoint.
+        * 
+        * @throws Exception Exceptions encountered when calling the hook will 
cause the checkpoint to abort.
+        */
+       @Nullable
+       Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor 
executor) throws Exception;
+
+       /**
+        * This method is called by the checkpoint coordinator prior to 
restoring the state of a checkpoint.
+        * If the checkpoint did store data from this hook, that data will be 
passed to this method. 
+        * 
+        * @param checkpointId The The ID (logical timestamp) of the restored 
checkpoint
+        * @param checkpointData The data originally stored in the checkpoint 
by this hook, possibly null. 
+        * 
+        * @throws Exception Exceptions thrown while restoring the checkpoint 
will cause the restore
+        *                   operation to fail and to possibly fall back to 
another checkpoint. 
+        */
+       void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) 
throws Exception;
+
+       /**
+        * Creates a the serializer to (de)serializes the data stored by this 
hook. The serializer
+        * serializes the result of the Future returned by the {@link 
#triggerCheckpoint(long, long, Executor)}
+        * method, and deserializes the data stored in the checkpoint into the 
object passed to the
+        * {@link #restoreCheckpoint(long, Object)} method. 
+        * 
+        * <p>If the hook never returns any data to be stored, then this method 
may return null as the
+        * serializer.
+        * 
+        * @return The serializer to (de)serializes the data stored by this hook
+        */
+       @Nullable
+       SimpleVersionedSerializer<T> createCheckpointDataSerializer();
+
+       // 
------------------------------------------------------------------------
+       //  factory
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A factory to instantiate a {@code MasterTriggerRestoreHook}.
+        * 
+        * The hooks are defined when creating the streaming dataflow graph and 
are attached
+        * to the job graph, which gets sent to the cluster for execution. To 
avoid having to make
+        * the hook implementation serializable, a serializable hook factory is 
actually attached to the
+        * job graph instead of the hook implementation itself.
+        */
+       interface Factory extends java.io.Serializable {
+
+               /**
+                * Instantiates the {@code MasterTriggerRestoreHook}.
+                */
+               <V> MasterTriggerRestoreHook<V> create();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 900331b..ce97edc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -41,8 +41,10 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
@@ -89,6 +91,8 @@ public class PendingCheckpoint {
 
        private final Map<ExecutionAttemptID, ExecutionVertex> 
notYetAcknowledgedTasks;
 
+       private final List<MasterState> masterState;
+
        /** Set of acknowledged tasks */
        private final Set<ExecutionAttemptID> acknowledgedTasks;
 
@@ -143,6 +147,7 @@ public class PendingCheckpoint {
                this.executor = Preconditions.checkNotNull(executor);
 
                this.taskStates = new HashMap<>();
+               this.masterState = new ArrayList<>();
                this.acknowledgedTasks = new 
HashSet<>(verticesToConfirm.size());
                this.onCompletionPromise = new FlinkCompletableFuture<>();
        }
@@ -256,7 +261,7 @@ public class PendingCheckpoint {
                        // make sure we fulfill the promise with an exception 
if something fails
                        try {
                                // externalize the metadata
-                               final Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
+                               final Savepoint savepoint = new 
SavepointV2(checkpointId, taskStates.values());
 
                                // TEMP FIX - The savepoint store is strictly 
typed to file systems currently
                                //            but the checkpoints think more 
generic. we need to work with file handles
@@ -321,7 +326,8 @@ public class PendingCheckpoint {
                                checkpointId,
                                checkpointTimestamp,
                                System.currentTimeMillis(),
-                               new HashMap<>(taskStates),
+                               taskStates,
+                               masterState,
                                props,
                                externalMetadata,
                                externalPointer);
@@ -345,6 +351,17 @@ public class PendingCheckpoint {
        }
 
        /**
+        * Adds a master state (state generated on the checkpoint coordinator) 
to
+        * the pending checkpoint.
+        * 
+        * @param state The state to add
+        */
+       public void addMasterState(MasterState state) {
+               checkNotNull(state);
+               masterState.add(state);
+       }
+
+       /**
         * Acknowledges the task with the given execution attempt id and the 
given subtask state.
         *
         * @param executionAttemptId of the acknowledged task

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
new file mode 100644
index 0000000..409019e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.hooks;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Collection of methods to deal with checkpoint master hooks.
+ */
+public class MasterHooks {
+
+       // 
------------------------------------------------------------------------
+       //  checkpoint triggering
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Triggers all given master hooks and returns state objects for each 
hook that
+        * produced a state.
+        * 
+        * @param hooks The hooks to trigger
+        * @param checkpointId The checkpoint ID of the triggering checkpoint
+        * @param timestamp The (informational) timestamp for the triggering 
checkpoint 
+        * @param executor An executor that can be used for asynchronous I/O 
calls
+        * @param timeout The maximum time that a hook may take to complete
+        * 
+        * @return A list containing all states produced by the hooks
+        * 
+        * @throws FlinkException Thrown, if the hooks throw an exception, or 
the state+
+        *                        deserialization fails.
+        */
+       public static List<MasterState> triggerMasterHooks(
+                       Collection<MasterTriggerRestoreHook<?>> hooks,
+                       long checkpointId,
+                       long timestamp,
+                       Executor executor,
+                       Time timeout) throws FlinkException {
+
+               final ArrayList<MasterState> states = new 
ArrayList<>(hooks.size());
+
+               for (MasterTriggerRestoreHook<?> hook : hooks) {
+                       MasterState state = triggerHook(hook, checkpointId, 
timestamp, executor, timeout);
+                       if (state != null) {
+                               states.add(state);
+                       }
+               }
+
+               states.trimToSize();
+               return states;
+       }
+
+       private static <T> MasterState triggerHook(
+                       MasterTriggerRestoreHook<?> hook,
+                       long checkpointId,
+                       long timestamp,
+                       Executor executor,
+                       Time timeout) throws FlinkException {
+
+               @SuppressWarnings("unchecked")
+               final MasterTriggerRestoreHook<T> typedHook = 
(MasterTriggerRestoreHook<T>) hook;
+
+               final String id = typedHook.getIdentifier();
+               final SimpleVersionedSerializer<T> serializer = 
typedHook.createCheckpointDataSerializer();
+
+               // call the hook!
+               final Future<T> resultFuture;
+               try {
+                       resultFuture = 
typedHook.triggerCheckpoint(checkpointId, timestamp, executor);
+               }
+               catch (Throwable t) {
+                       ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                       throw new FlinkException("Error while triggering 
checkpoint master hook '" + id + '\'', t);
+               }
+
+               // is there is a result future, wait for its completion
+               // in the future we want to make this asynchronous with futures 
(no pun intended)
+               if (resultFuture == null) {
+                       return null;
+               }
+               else {
+                       final T result;
+                       try {
+                               result = resultFuture.get(timeout.getSize(), 
timeout.getUnit());
+                       }
+                       catch (InterruptedException e) {
+                               // cannot continue here - restore interrupt 
status and leave
+                               Thread.currentThread().interrupt();
+                               throw new FlinkException("Checkpoint master 
hook was interrupted");
+                       }
+                       catch (ExecutionException e) {
+                               throw new FlinkException("Checkpoint master 
hook '" + id + "' produced an exception", e.getCause());
+                       }
+                       catch (TimeoutException e) {
+                               throw new FlinkException("Checkpoint master 
hook '" + id +
+                                               "' did not complete in time (" 
+ timeout + ')');
+                       }
+
+                       // if the result of the future is not null, return it 
as state
+                       if (result == null) {
+                               return null;
+                       }
+                       else if (serializer != null) {
+                               try {
+                                       final int version = 
serializer.getVersion();
+                                       final byte[] bytes = 
serializer.serialize(result);
+
+                                       return new MasterState(id, bytes, 
version);
+                               }
+                               catch (Throwable t) {
+                                       
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                                       throw new FlinkException("Failed to 
serialize state of master hook '" + id + '\'', t);
+                               }
+                       }
+                       else {
+                               throw new FlinkException("Checkpoint hook '" + 
id + " is stateful but creates no serializer");
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  checkpoint restoring
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Calls the restore method given checkpoint master hooks and passes 
the given master
+        * state to them where state with a matching name is found. 
+        * 
+        * <p>If state is found and no hook with the same name is found, the 
method throws an
+        * exception, unless the {@code allowUnmatchedState} flag is set.
+        *     
+        * @param masterHooks The hooks to call restore on
+        * @param states The state to pass to the hooks
+        * @param checkpointId The checkpoint ID of the restored checkpoint
+        * @param allowUnmatchedState True, 
+        * @param log The logger for log messages
+        * 
+        * @throws FlinkException Thrown, if the hooks throw an exception, or 
the state+
+        *                        deserialization fails.
+        */
+       public static void restoreMasterHooks(
+                       final Map<String, MasterTriggerRestoreHook<?>> 
masterHooks,
+                       final Collection<MasterState> states,
+                       final long checkpointId,
+                       final boolean allowUnmatchedState,
+                       final Logger log) throws FlinkException {
+
+               // early out
+               if (states == null || states.isEmpty() || masterHooks == null 
|| masterHooks.isEmpty()) {
+                       log.info("No master state to restore");
+                       return;
+               }
+
+               log.info("Calling master restore hooks");
+
+               // collect the hooks
+               final LinkedHashMap<String, MasterTriggerRestoreHook<?>> 
allHooks = new LinkedHashMap<>(masterHooks);
+
+               // first, deserialize all hook state
+               final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> 
hooksAndStates = new ArrayList<>();
+
+               for (MasterState state : states) {
+                       if (state != null) {
+                               final String name = state.name();
+                               final MasterTriggerRestoreHook<?> hook = 
allHooks.remove(name);
+
+                               if (hook != null) {
+                                       log.debug("Found state to restore for 
hook '{}'", name);
+
+                                       Object deserializedState = 
deserializeState(state, hook);
+                                       hooksAndStates.add(new 
Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState));
+                               }
+                               else if (!allowUnmatchedState) {
+                                       throw new IllegalStateException("Found 
state '" + state.name() +
+                                                       "' which is not resumed 
by any hook.");
+                               }
+                               else {
+                                       log.info("Dropping unmatched state from 
'{}'", name);
+                               }
+                       }
+               }
+
+               // now that all is deserialized, call the hooks 
+               for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : 
hooksAndStates) {
+                       restoreHook(hookAndState.f1, hookAndState.f0, 
checkpointId);
+               }
+
+               // trigger the remaining hooks without checkpointed state
+               for (MasterTriggerRestoreHook<?> hook : allHooks.values()) {
+                       restoreHook(null, hook, checkpointId);
+               }
+       }
+
+       private static <T> T deserializeState(MasterState state, 
MasterTriggerRestoreHook<?> hook) throws FlinkException {
+               @SuppressWarnings("unchecked")
+               final MasterTriggerRestoreHook<T> typedHook = 
(MasterTriggerRestoreHook<T>) hook;
+               final String id = hook.getIdentifier();
+
+               try {
+                       final SimpleVersionedSerializer<T> deserializer = 
typedHook.createCheckpointDataSerializer();
+                       if (deserializer == null) {
+                               throw new FlinkException("null serializer for 
state of hook " + hook.getIdentifier());
+                       }
+
+                       return deserializer.deserialize(state.version(), 
state.bytes());
+               }
+               catch (Throwable t) {
+                       throw new FlinkException("Cannot deserialize state for 
master hook '" + id + '\'', t);
+               }
+       }
+
+       private static <T> void restoreHook(
+                       final Object state,
+                       final MasterTriggerRestoreHook<?> hook,
+                       final long checkpointId) throws FlinkException {
+
+               @SuppressWarnings("unchecked")
+               final T typedState = (T) state;
+
+               @SuppressWarnings("unchecked")
+               final MasterTriggerRestoreHook<T> typedHook = 
(MasterTriggerRestoreHook<T>) hook;
+
+               try {
+                       typedHook.restoreCheckpoint(checkpointId, typedState);
+               }
+               catch (FlinkException e) {
+                       throw e;
+               }
+               catch (Throwable t) {
+                       // catch all here, including Errors that may come from 
dependency and classpath issues
+                       ExceptionUtils.rethrowIfFatalError(t);
+                       throw new FlinkException("Error while calling 
restoreCheckpoint on checkpoint hook '"
+                                       + hook.getIdentifier() + '\'', t);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /** This class is not meant to be instantiated */
+       private MasterHooks() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index baad05f..79ec596 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.io.Versioned;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 
 import java.util.Collection;
@@ -58,6 +59,11 @@ public interface Savepoint extends Versioned {
        Collection<TaskState> getTaskStates();
 
        /**
+        * Gets the checkpointed states generated by the master.
+        */
+       Collection<MasterState> getMasterStates();
+
+       /**
         * Disposes the savepoint.
         */
        void dispose() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 60f0287..8ee38da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,7 +123,7 @@ public class SavepointLoader {
                // (3) convert to checkpoint so the system can fall back to it
                CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
                return new CompletedCheckpoint(jobId, 
savepoint.getCheckpointId(), 0L, 0L,
-                               taskStates, props, metadataHandle, 
savepointPath);
+                               taskStates, savepoint.getMasterStates(), props, 
metadataHandle, savepointPath);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index 3155d60..c1fcf4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0;
 import 
org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer;
 import org.apache.flink.util.Preconditions;
 
@@ -30,14 +31,16 @@ import java.util.Map;
 public class SavepointSerializers {
 
 
-       private static final int SAVEPOINT_VERSION_0 = 0;
        private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = 
new HashMap<>(2);
 
        static {
-               SERIALIZERS.put(SAVEPOINT_VERSION_0, 
SavepointV0Serializer.INSTANCE);
+               SERIALIZERS.put(SavepointV0.VERSION, 
SavepointV0Serializer.INSTANCE);
                SERIALIZERS.put(SavepointV1.VERSION, 
SavepointV1Serializer.INSTANCE);
+               SERIALIZERS.put(SavepointV2.VERSION, 
SavepointV2Serializer.INSTANCE);
        }
 
+       // 
------------------------------------------------------------------------
+
        /**
         * Returns the {@link SavepointSerializer} for the given savepoint.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
index 5976bbf..196c870 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.util.Preconditions;
 
@@ -60,36 +61,21 @@ public class SavepointV1 implements Savepoint {
        }
 
        @Override
-       public void dispose() throws Exception {
-               for (TaskState taskState : taskStates) {
-                       taskState.discardState();
-               }
-               taskStates.clear();
-       }
-
-       @Override
-       public String toString() {
-               return "Savepoint(version=" + VERSION + ")";
+       public Collection<MasterState> getMasterStates() {
+               // since checkpoints are never deserialized into this format,
+               // this method should never be called
+               throw new UnsupportedOperationException();
        }
 
        @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               SavepointV1 that = (SavepointV1) o;
-               return checkpointId == that.checkpointId && 
getTaskStates().equals(that.getTaskStates());
+       public void dispose() throws Exception {
+               // since checkpoints are never deserialized into this format,
+               // this method should never be called
+               throw new UnsupportedOperationException();
        }
 
        @Override
-       public int hashCode() {
-               int result = (int) (checkpointId ^ (checkpointId >>> 32));
-               result = 31 * result + taskStates.hashCode();
-               return result;
+       public String toString() {
+               return "Savepoint(version=" + VERSION + ")";
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 44461d8..ae9f4a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -37,18 +38,19 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Serializer for {@link SavepointV1} instances.
- * <p>
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
+ * Deserializer for checkpoints written in format {@code 1} (Flink 1.2.x 
format)
+ * 
+ * <p>In contrast to the previous versions, this serializer makes sure that no 
Java
+ * serialization is used for serialization. Therefore, we don't rely on any 
involved 
+ * classes to stay the same.
  */
-class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+class SavepointV1Serializer implements SavepointSerializer<SavepointV2> {
 
        private static final byte NULL_HANDLE = 0;
        private static final byte BYTE_STREAM_STATE_HANDLE = 1;
@@ -63,39 +65,12 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
        }
 
        @Override
-       public void serialize(SavepointV1 savepoint, DataOutputStream dos) 
throws IOException {
-               try {
-                       dos.writeLong(savepoint.getCheckpointId());
-
-                       Collection<TaskState> taskStates = 
savepoint.getTaskStates();
-                       dos.writeInt(taskStates.size());
-
-                       for (TaskState taskState : savepoint.getTaskStates()) {
-                               // Vertex ID
-                               
dos.writeLong(taskState.getJobVertexID().getLowerPart());
-                               
dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
-                               // Parallelism
-                               int parallelism = taskState.getParallelism();
-                               dos.writeInt(parallelism);
-                               dos.writeInt(taskState.getMaxParallelism());
-                               dos.writeInt(taskState.getChainLength());
-
-                               // Sub task states
-                               Map<Integer, SubtaskState> subtaskStateMap = 
taskState.getSubtaskStates();
-                               dos.writeInt(subtaskStateMap.size());
-                               for (Map.Entry<Integer, SubtaskState> entry : 
subtaskStateMap.entrySet()) {
-                                       dos.writeInt(entry.getKey());
-                                       serializeSubtaskState(entry.getValue(), 
dos);
-                               }
-                       }
-               } catch (Exception e) {
-                       throw new IOException(e);
-               }
+       public void serialize(SavepointV2 savepoint, DataOutputStream dos) 
throws IOException {
+               throw new UnsupportedOperationException("This serializer is 
read-only and only exists for backwards compatibility");
        }
 
        @Override
-       public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) 
throws IOException {
+       public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) 
throws IOException {
                long checkpointId = dis.readLong();
 
                // Task states
@@ -122,7 +97,34 @@ class SavepointV1Serializer implements 
SavepointSerializer<SavepointV1> {
                        }
                }
 
-               return new SavepointV1(checkpointId, taskStates);
+               return new SavepointV2(checkpointId, taskStates, 
Collections.<MasterState>emptyList());
+       }
+
+       public void serializeOld(SavepointV1 savepoint, DataOutputStream dos) 
throws IOException {
+               dos.writeLong(savepoint.getCheckpointId());
+
+               Collection<TaskState> taskStates = savepoint.getTaskStates();
+               dos.writeInt(taskStates.size());
+
+               for (TaskState taskState : savepoint.getTaskStates()) {
+                       // Vertex ID
+                       
dos.writeLong(taskState.getJobVertexID().getLowerPart());
+                       
dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+                       // Parallelism
+                       int parallelism = taskState.getParallelism();
+                       dos.writeInt(parallelism);
+                       dos.writeInt(taskState.getMaxParallelism());
+                       dos.writeInt(taskState.getChainLength());
+
+                       // Sub task states
+                       Map<Integer, SubtaskState> subtaskStateMap = 
taskState.getSubtaskStates();
+                       dos.writeInt(subtaskStateMap.size());
+                       for (Map.Entry<Integer, SubtaskState> entry : 
subtaskStateMap.entrySet()) {
+                               dos.writeInt(entry.getKey());
+                               serializeSubtaskState(entry.getValue(), dos);
+                       }
+               }
        }
 
        private static void serializeSubtaskState(SubtaskState subtaskState, 
DataOutputStream dos) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
new file mode 100644
index 0000000..100982d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The persistent checkpoint metadata, format version 2.
+ * his format was introduced with Flink 1.3.0.
+ */
+public class SavepointV2 implements Savepoint {
+
+       /** The savepoint version. */
+       public static final int VERSION = 2;
+
+       /** The checkpoint ID */
+       private final long checkpointId;
+
+       /** The task states */
+       private final Collection<TaskState> taskStates;
+
+       /** The states generated by the CheckpointCoordinator */
+       private final Collection<MasterState> masterStates;
+
+
+       public SavepointV2(long checkpointId, Collection<TaskState> taskStates) 
{
+               this(checkpointId, taskStates, 
Collections.<MasterState>emptyList());
+       }
+
+       public SavepointV2(long checkpointId, Collection<TaskState> taskStates, 
Collection<MasterState> masterStates) {
+               this.checkpointId = checkpointId;
+               this.taskStates = checkNotNull(taskStates, "taskStates");
+               this.masterStates = checkNotNull(masterStates, "masterStates");
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+
+       @Override
+       public long getCheckpointId() {
+               return checkpointId;
+       }
+
+       @Override
+       public Collection<TaskState> getTaskStates() {
+               return taskStates;
+       }
+
+       @Override
+       public Collection<MasterState> getMasterStates() {
+               return masterStates;
+       }
+
+       @Override
+       public void dispose() throws Exception {
+               for (TaskState taskState : taskStates) {
+                       taskState.discardState();
+               }
+               taskStates.clear();
+               masterStates.clear();
+       }
+
+       @Override
+       public String toString() {
+               return "Checkpoint Metadata (version=" + VERSION + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
new file mode 100644
index 0000000..307ea16
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * (De)serializer for checkpoint metadata format version 2.
+ * 
+ * <p>This format version adds
+ * 
+ * <p>Basic checkpoint metadata layout:
+ * <pre>
+ *  +--------------+---------------+-----------------+
+ *  | checkpointID | master states | operator states |
+ *  +--------------+---------------+-----------------+
+ *  
+ *  Master state:
+ *  +--------------+---------------------+---------+------+---------------+
+ *  | magic number | num remaining bytes | version | name | payload bytes |
+ *  +--------------+---------------------+---------+------+---------------+
+ * </pre>
+ */
+class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
+
+       /** Random magic number for consistency checks */
+       private static final int MASTER_STATE_MAGIC_NUMBER = 0xc96b1696;
+
+       private static final byte NULL_HANDLE = 0;
+       private static final byte BYTE_STREAM_STATE_HANDLE = 1;
+       private static final byte FILE_STREAM_STATE_HANDLE = 2;
+       private static final byte KEY_GROUPS_HANDLE = 3;
+       private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
+
+       /** The singleton instance of the serializer */
+       public static final SavepointV2Serializer INSTANCE = new 
SavepointV2Serializer();
+
+       // 
------------------------------------------------------------------------
+
+       /** Singleton, not meant to be instantiated */
+       private SavepointV2Serializer() {}
+
+       // 
------------------------------------------------------------------------
+       //  (De)serialization entry points
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void serialize(SavepointV2 checkpointMetadata, DataOutputStream 
dos) throws IOException {
+               // first: checkpoint ID
+               dos.writeLong(checkpointMetadata.getCheckpointId());
+
+               // second: master state
+               final Collection<MasterState> masterStates = 
checkpointMetadata.getMasterStates();
+               dos.writeInt(masterStates.size());
+               for (MasterState ms : masterStates) {
+                       serializeMasterState(ms, dos);
+               }
+
+               // third: task states
+               final Collection<TaskState> taskStates = 
checkpointMetadata.getTaskStates();
+               dos.writeInt(taskStates.size());
+
+               for (TaskState taskState : checkpointMetadata.getTaskStates()) {
+                       // Vertex ID
+                       
dos.writeLong(taskState.getJobVertexID().getLowerPart());
+                       
dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+                       // Parallelism
+                       int parallelism = taskState.getParallelism();
+                       dos.writeInt(parallelism);
+                       dos.writeInt(taskState.getMaxParallelism());
+                       dos.writeInt(taskState.getChainLength());
+
+                       // Sub task states
+                       Map<Integer, SubtaskState> subtaskStateMap = 
taskState.getSubtaskStates();
+                       dos.writeInt(subtaskStateMap.size());
+                       for (Map.Entry<Integer, SubtaskState> entry : 
subtaskStateMap.entrySet()) {
+                               dos.writeInt(entry.getKey());
+                               serializeSubtaskState(entry.getValue(), dos);
+                       }
+               }
+       }
+
+       @Override
+       public SavepointV2 deserialize(DataInputStream dis, ClassLoader cl) 
throws IOException {
+               // first: checkpoint ID
+               final long checkpointId = dis.readLong();
+               if (checkpointId < 0) {
+                       throw new IOException("invalid checkpoint ID: " + 
checkpointId);
+               }
+
+               // second: master state
+               final List<MasterState> masterStates;
+               final int numMasterStates = dis.readInt();
+
+               if (numMasterStates == 0) {
+                       masterStates = Collections.emptyList();
+               }
+               else if (numMasterStates > 0) {
+                       masterStates = new ArrayList<>(numMasterStates);
+                       for (int i = 0; i < numMasterStates; i++) {
+                               masterStates.add(deserializeMasterState(dis));
+                       }
+               }
+               else {
+                       throw new IOException("invalid number of master states: 
" + numMasterStates);
+               }
+
+               // third: task states
+               final int numTaskStates = dis.readInt();
+               final ArrayList<TaskState> taskStates = new 
ArrayList<>(numTaskStates);
+
+               for (int i = 0; i < numTaskStates; i++) {
+                       JobVertexID jobVertexId = new 
JobVertexID(dis.readLong(), dis.readLong());
+                       int parallelism = dis.readInt();
+                       int maxParallelism = dis.readInt();
+                       int chainLength = dis.readInt();
+
+                       // Add task state
+                       TaskState taskState = new TaskState(jobVertexId, 
parallelism, maxParallelism, chainLength);
+                       taskStates.add(taskState);
+
+                       // Sub task states
+                       int numSubTaskStates = dis.readInt();
+
+                       for (int j = 0; j < numSubTaskStates; j++) {
+                               int subtaskIndex = dis.readInt();
+                               SubtaskState subtaskState = 
deserializeSubtaskState(dis);
+                               taskState.putState(subtaskIndex, subtaskState);
+                       }
+               }
+
+               return new SavepointV2(checkpointId, taskStates, masterStates);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  master state (de)serialization methods
+       // 
------------------------------------------------------------------------
+
+       private void serializeMasterState(MasterState state, DataOutputStream 
dos) throws IOException {
+               // magic number for error detection
+               dos.writeInt(MASTER_STATE_MAGIC_NUMBER);
+
+               // for safety, we serialize first into an array and then write 
the array and its
+               // length into the checkpoint
+               final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               final DataOutputStream out = new DataOutputStream(baos);
+
+               out.writeInt(state.version());
+               out.writeUTF(state.name());
+
+               final byte[] bytes = state.bytes();
+               out.writeInt(bytes.length);
+               out.write(bytes, 0, bytes.length);
+
+               out.close();
+               byte[] data = baos.toByteArray();
+
+               dos.writeInt(data.length);
+               dos.write(data, 0, data.length);
+       }
+
+       private MasterState deserializeMasterState(DataInputStream dis) throws 
IOException {
+               final int magicNumber = dis.readInt();
+               if (magicNumber != MASTER_STATE_MAGIC_NUMBER) {
+                       throw new IOException("incorrect magic number in master 
styte byte sequence");
+               }
+
+               final int numBytes = dis.readInt();
+               if (numBytes <= 0) {
+                       throw new IOException("found zero or negative length 
for master state bytes");
+               }
+
+               final byte[] data = new byte[numBytes];
+               dis.readFully(data);
+
+               final DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(data));
+
+               final int version = in.readInt();
+               final String name = in.readUTF();
+
+               final byte[] bytes = new byte[in.readInt()];
+               in.readFully(bytes);
+
+               // check that the data is not corrupt
+               if (in.read() != -1) {
+                       throw new IOException("found trailing bytes in master 
state");
+               }
+
+               return new MasterState(name, bytes, version);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  task state (de)serialization methods
+       // 
------------------------------------------------------------------------
+
+       private static void serializeSubtaskState(SubtaskState subtaskState, 
DataOutputStream dos) throws IOException {
+
+               dos.writeLong(-1);
+
+               ChainedStateHandle<StreamStateHandle> nonPartitionableState = 
subtaskState.getLegacyOperatorState();
+
+               int len = nonPartitionableState != null ? 
nonPartitionableState.getLength() : 0;
+               dos.writeInt(len);
+               for (int i = 0; i < len; ++i) {
+                       StreamStateHandle stateHandle = 
nonPartitionableState.get(i);
+                       serializeStreamStateHandle(stateHandle, dos);
+               }
+
+               ChainedStateHandle<OperatorStateHandle> operatorStateBackend = 
subtaskState.getManagedOperatorState();
+
+               len = operatorStateBackend != null ? 
operatorStateBackend.getLength() : 0;
+               dos.writeInt(len);
+               for (int i = 0; i < len; ++i) {
+                       OperatorStateHandle stateHandle = 
operatorStateBackend.get(i);
+                       serializeOperatorStateHandle(stateHandle, dos);
+               }
+
+               ChainedStateHandle<OperatorStateHandle> operatorStateFromStream 
= subtaskState.getRawOperatorState();
+
+               len = operatorStateFromStream != null ? 
operatorStateFromStream.getLength() : 0;
+               dos.writeInt(len);
+               for (int i = 0; i < len; ++i) {
+                       OperatorStateHandle stateHandle = 
operatorStateFromStream.get(i);
+                       serializeOperatorStateHandle(stateHandle, dos);
+               }
+
+               KeyedStateHandle keyedStateBackend = 
subtaskState.getManagedKeyedState();
+               serializeKeyedStateHandle(keyedStateBackend, dos);
+
+               KeyedStateHandle keyedStateStream = 
subtaskState.getRawKeyedState();
+               serializeKeyedStateHandle(keyedStateStream, dos);
+       }
+
+       private static SubtaskState deserializeSubtaskState(DataInputStream 
dis) throws IOException {
+               // Duration field has been removed from SubtaskState
+               long ignoredDuration = dis.readLong();
+
+               int len = dis.readInt();
+               List<StreamStateHandle> nonPartitionableState = new 
ArrayList<>(len);
+               for (int i = 0; i < len; ++i) {
+                       StreamStateHandle streamStateHandle = 
deserializeStreamStateHandle(dis);
+                       nonPartitionableState.add(streamStateHandle);
+               }
+
+
+               len = dis.readInt();
+               List<OperatorStateHandle> operatorStateBackend = new 
ArrayList<>(len);
+               for (int i = 0; i < len; ++i) {
+                       OperatorStateHandle streamStateHandle = 
deserializeOperatorStateHandle(dis);
+                       operatorStateBackend.add(streamStateHandle);
+               }
+
+               len = dis.readInt();
+               List<OperatorStateHandle> operatorStateStream = new 
ArrayList<>(len);
+               for (int i = 0; i < len; ++i) {
+                       OperatorStateHandle streamStateHandle = 
deserializeOperatorStateHandle(dis);
+                       operatorStateStream.add(streamStateHandle);
+               }
+
+               KeyedStateHandle keyedStateBackend = 
deserializeKeyedStateHandle(dis);
+
+               KeyedStateHandle keyedStateStream = 
deserializeKeyedStateHandle(dis);
+
+               ChainedStateHandle<StreamStateHandle> 
nonPartitionableStateChain =
+                               new ChainedStateHandle<>(nonPartitionableState);
+
+               ChainedStateHandle<OperatorStateHandle> 
operatorStateBackendChain =
+                               new ChainedStateHandle<>(operatorStateBackend);
+
+               ChainedStateHandle<OperatorStateHandle> 
operatorStateStreamChain =
+                               new ChainedStateHandle<>(operatorStateStream);
+
+               return new SubtaskState(
+                               nonPartitionableStateChain,
+                               operatorStateBackendChain,
+                               operatorStateStreamChain,
+                               keyedStateBackend,
+                               keyedStateStream);
+       }
+
+       private static void serializeKeyedStateHandle(
+                       KeyedStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+
+               if (stateHandle == null) {
+                       dos.writeByte(NULL_HANDLE);
+               } else if (stateHandle instanceof KeyGroupsStateHandle) {
+                       KeyGroupsStateHandle keyGroupsStateHandle = 
(KeyGroupsStateHandle) stateHandle;
+
+                       dos.writeByte(KEY_GROUPS_HANDLE);
+                       
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
+                       
dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+                       for (int keyGroup : 
keyGroupsStateHandle.getKeyGroupRange()) {
+                               
dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
+                       }
+                       
serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
+               } else {
+                       throw new IllegalStateException("Unknown 
KeyedStateHandle type: " + stateHandle.getClass());
+               }
+       }
+
+       private static KeyedStateHandle 
deserializeKeyedStateHandle(DataInputStream dis) throws IOException {
+               final int type = dis.readByte();
+               if (NULL_HANDLE == type) {
+                       return null;
+               } else if (KEY_GROUPS_HANDLE == type) {
+                       int startKeyGroup = dis.readInt();
+                       int numKeyGroups = dis.readInt();
+                       KeyGroupRange keyGroupRange = 
KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+                       long[] offsets = new long[numKeyGroups];
+                       for (int i = 0; i < numKeyGroups; ++i) {
+                               offsets[i] = dis.readLong();
+                       }
+                       KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(
+                               keyGroupRange, offsets);
+                       StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
+                       return new KeyGroupsStateHandle(keyGroupRangeOffsets, 
stateHandle);
+               } else {
+                       throw new IllegalStateException("Reading invalid 
KeyedStateHandle, type: " + type);
+               }
+       }
+
+       private static void serializeOperatorStateHandle(
+                       OperatorStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+
+               if (stateHandle != null) {
+                       dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+                       Map<String, OperatorStateHandle.StateMetaInfo> 
partitionOffsetsMap =
+                                       
stateHandle.getStateNameToPartitionOffsets();
+                       dos.writeInt(partitionOffsetsMap.size());
+                       for (Map.Entry<String, 
OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
+                               dos.writeUTF(entry.getKey());
+
+                               OperatorStateHandle.StateMetaInfo stateMetaInfo 
= entry.getValue();
+
+                               int mode = 
stateMetaInfo.getDistributionMode().ordinal();
+                               dos.writeByte(mode);
+
+                               long[] offsets = stateMetaInfo.getOffsets();
+                               dos.writeInt(offsets.length);
+                               for (long offset : offsets) {
+                                       dos.writeLong(offset);
+                               }
+                       }
+                       
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+               } else {
+                       dos.writeByte(NULL_HANDLE);
+               }
+       }
+
+       private static OperatorStateHandle deserializeOperatorStateHandle(
+                       DataInputStream dis) throws IOException {
+
+               final int type = dis.readByte();
+               if (NULL_HANDLE == type) {
+                       return null;
+               } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+                       int mapSize = dis.readInt();
+                       Map<String, OperatorStateHandle.StateMetaInfo> 
offsetsMap = new HashMap<>(mapSize);
+                       for (int i = 0; i < mapSize; ++i) {
+                               String key = dis.readUTF();
+
+                               int modeOrdinal = dis.readByte();
+                               OperatorStateHandle.Mode mode = 
OperatorStateHandle.Mode.values()[modeOrdinal];
+
+                               long[] offsets = new long[dis.readInt()];
+                               for (int j = 0; j < offsets.length; ++j) {
+                                       offsets[j] = dis.readLong();
+                               }
+
+                               OperatorStateHandle.StateMetaInfo metaInfo =
+                                               new 
OperatorStateHandle.StateMetaInfo(offsets, mode);
+                               offsetsMap.put(key, metaInfo);
+                       }
+                       StreamStateHandle stateHandle = 
deserializeStreamStateHandle(dis);
+                       return new OperatorStateHandle(offsetsMap, stateHandle);
+               } else {
+                       throw new IllegalStateException("Reading invalid 
OperatorStateHandle, type: " + type);
+               }
+       }
+
+       private static void serializeStreamStateHandle(
+                       StreamStateHandle stateHandle, DataOutputStream dos) 
throws IOException {
+
+               if (stateHandle == null) {
+                       dos.writeByte(NULL_HANDLE);
+
+               } else if (stateHandle instanceof FileStateHandle) {
+                       dos.writeByte(FILE_STREAM_STATE_HANDLE);
+                       FileStateHandle fileStateHandle = (FileStateHandle) 
stateHandle;
+                       dos.writeLong(stateHandle.getStateSize());
+                       dos.writeUTF(fileStateHandle.getFilePath().toString());
+
+               } else if (stateHandle instanceof ByteStreamStateHandle) {
+                       dos.writeByte(BYTE_STREAM_STATE_HANDLE);
+                       ByteStreamStateHandle byteStreamStateHandle = 
(ByteStreamStateHandle) stateHandle;
+                       dos.writeUTF(byteStreamStateHandle.getHandleName());
+                       byte[] internalData = byteStreamStateHandle.getData();
+                       dos.writeInt(internalData.length);
+                       dos.write(byteStreamStateHandle.getData());
+
+               } else {
+                       throw new IOException("Unknown implementation of 
StreamStateHandle: " + stateHandle.getClass());
+               }
+
+               dos.flush();
+       }
+
+       private static StreamStateHandle 
deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+               final int type = dis.read();
+               if (NULL_HANDLE == type) {
+                       return null;
+               } else if (FILE_STREAM_STATE_HANDLE == type) {
+                       long size = dis.readLong();
+                       String pathString = dis.readUTF();
+                       return new FileStateHandle(new Path(pathString), size);
+               } else if (BYTE_STREAM_STATE_HANDLE == type) {
+                       String handleName = dis.readUTF();
+                       int numBytes = dis.readInt();
+                       byte[] data = new byte[numBytes];
+                       dis.readFully(data);
+                       return new ByteStreamStateHandle(handleName, data);
+               } else {
+                       throw new IOException("Unknown implementation of 
StreamStateHandle, code: " + type);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 29b9806..23ed99d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -360,6 +361,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        List<ExecutionJobVertex> verticesToTrigger,
                        List<ExecutionJobVertex> verticesToWaitFor,
                        List<ExecutionJobVertex> verticesToCommitTo,
+                       List<MasterTriggerRestoreHook<?>> masterHooks,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
                        String checkpointDir,
@@ -395,6 +397,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        checkpointDir,
                        ioExecutor);
 
+               // register the master hooks on the checkpoint coordinator
+               for (MasterTriggerRestoreHook<?> hook : masterHooks) {
+                       if (!checkpointCoordinator.addMasterHook(hook)) {
+                               LOG.warn("Trying to register multiple 
checkpoint hooks with the name: {}", hook.getIdentifier());
+                       }
+               }
+
                
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
 
                // interval of max long value indicates disable periodic 
checkpoint,

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a10c62e..b40817f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
@@ -51,6 +52,7 @@ import org.slf4j.Logger;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
@@ -230,6 +232,21 @@ public class ExecutionGraphBuilder {
                                }
                        }
 
+                       // instantiate the user-defined checkpoint hooks
+
+                       final MasterTriggerRestoreHook.Factory[] hookFactories 
= snapshotSettings.getMasterHooks();
+                       final List<MasterTriggerRestoreHook<?>> hooks;
+
+                       if (hookFactories == null || hookFactories.length == 0) 
{
+                               hooks = Collections.emptyList();
+                       }
+                       else {
+                               hooks = new ArrayList<>(hookFactories.length);
+                               for (MasterTriggerRestoreHook.Factory factory : 
hookFactories) {
+                                       hooks.add(factory.create());
+                               }
+                       }
+
                        executionGraph.enableCheckpointing(
                                        
snapshotSettings.getCheckpointInterval(),
                                        snapshotSettings.getCheckpointTimeout(),
@@ -239,6 +256,7 @@ public class ExecutionGraphBuilder {
                                        triggerVertices,
                                        ackVertices,
                                        confirmVertices,
+                                       hooks,
                                        checkpointIdCounter,
                                        completedCheckpoints,
                                        externalizedCheckpointsDir,

Reply via email to