[FLINK-5763] [checkpoints] Add CheckpointOptions

Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
to barrier injecting tasks) and barriers (flowing inline with the data:

```java
public class CheckpointOptions {

  // Type of checkpoint
  // => FULL_CHECKPOINT
  // => SAVEPOINT
  @NonNull
  CheckpointType getCheckpointType();

  // Custom target location. This is a String, because for future
  // backends it can be a logical location like a DB table.
  @Nullable
  String getTargetLocation();

}
```

This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints).

These options are forwarded via the `StreamTask` to the `StreamOperator`s and
`Snapshotable` backends. The `AbstractStreamOperator` checks the options and
either i) forwards the shared per operator `CheckpointStreamFactory` (as of

For this, the state backends provide the following new method:

```
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```

The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).

We end up with the following directory layout for savepoints:

```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default via 
`state.savepoints.dir`)
+---------------------------+
  | +---------------------------------------+
  +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
    +---------------------------------------+
       |
       +- _metadata (one per savepoint)
       +- :uuid (one data file per StreamTask)
       +- ...
       +- :uuid
```


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e7a9174
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e7a9174
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e7a9174

Branch: refs/heads/master
Commit: 6e7a91741708a2b167a2bbca5dda5b2059df5e18
Parents: 1f9f38b
Author: Ufuk Celebi <[email protected]>
Authored: Thu Feb 16 17:56:23 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkITCase.java        |   1 -
 .../state/RocksDBKeyedStateBackend.java         |   5 +-
 .../streaming/state/RocksDBStateBackend.java    |   9 ++
 .../state/RocksDBAsyncSnapshotTest.java         |   8 +-
 .../state/RocksDBStateBackendTest.java          |  15 +-
 .../checkpoint/CheckpointCoordinator.java       |  56 ++++++--
 .../runtime/checkpoint/CheckpointOptions.java   | 108 +++++++++++++++
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |   3 +-
 .../checkpoint/savepoint/SavepointStore.java    | 137 +++++++++++++------
 .../flink/runtime/executiongraph/Execution.java |   6 +-
 .../io/network/api/CheckpointBarrier.java       |  44 +++++-
 .../api/serialization/EventSerializer.java      |  59 +++++++-
 .../runtime/jobgraph/tasks/StatefulTask.java    |   7 +-
 .../slots/ActorTaskManagerGateway.java          |   6 +-
 .../jobmanager/slots/TaskManagerGateway.java    |   5 +-
 .../jobmaster/RpcTaskManagerGateway.java        |   3 +-
 .../messages/checkpoint/TriggerCheckpoint.java  |  19 ++-
 .../state/AbstractKeyedStateBackend.java        |   3 +-
 .../runtime/state/AbstractStateBackend.java     |   8 ++
 .../state/DefaultOperatorStateBackend.java      |   8 +-
 .../flink/runtime/state/Snapshotable.java       |   5 +-
 .../flink/runtime/state/StateBackend.java       |  22 +++
 .../filesystem/FsCheckpointStreamFactory.java   |  21 +--
 .../filesystem/FsSavepointStreamFactory.java    |  58 ++++++++
 .../state/filesystem/FsStateBackend.java        |   9 ++
 .../state/heap/HeapKeyedStateBackend.java       |   4 +-
 .../state/memory/MemoryStateBackend.java        |   9 ++
 .../runtime/taskexecutor/TaskExecutor.java      |   5 +-
 .../taskexecutor/TaskExecutorGateway.java       |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  53 ++++---
 .../checkpoint/CheckpointOptionsTest.java       |  48 +++++++
 .../checkpoint/CheckpointStatsHistoryTest.java  |   1 +
 .../savepoint/MigrationV0ToV1Test.java          |   2 +-
 .../savepoint/SavepointLoaderTest.java          |   4 +-
 .../savepoint/SavepointStoreTest.java           |  48 +++++--
 .../io/network/api/CheckpointBarrierTest.java   |  61 +++++++++
 .../api/serialization/EventSerializerTest.java  |  45 ++++--
 .../io/network/api/writer/RecordWriterTest.java |   5 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../messages/CheckpointMessagesTest.java        |   3 +-
 .../runtime/state/OperatorStateBackendTest.java |   3 +-
 .../runtime/state/StateBackendTestBase.java     |  39 +++---
 .../FsSavepointStreamFactoryTest.java           |  67 +++++++++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   9 +-
 .../api/operators/AbstractStreamOperator.java   |  43 +++++-
 .../api/operators/OperatorSnapshotResult.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |  12 +-
 .../streaming/runtime/io/BarrierBuffer.java     |   5 +-
 .../streaming/runtime/io/BarrierTracker.java    |   9 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |  65 +++++++--
 .../api/checkpoint/ListCheckpointedTest.java    |   2 +-
 .../operators/AbstractStreamOperatorTest.java   |  65 +++++----
 .../AbstractUdfStreamOperatorLifecycleTest.java |  12 +-
 .../WrappingFunctionSnapshotRestoreTest.java    |   2 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   5 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |  13 +-
 .../io/BarrierBufferMassiveRandomTest.java      |   3 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  33 ++---
 .../runtime/io/BarrierTrackerTest.java          |   7 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   8 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  31 +++--
 .../runtime/tasks/SourceStreamTaskTest.java     |   3 +-
 .../StreamTaskCancellationBarrierTest.java      |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  37 ++---
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  29 ++--
 .../util/AbstractStreamOperatorTestHarness.java |  10 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   7 +-
 .../test/checkpointing/SavepointITCase.java     |  51 ++++---
 .../streaming/runtime/StateBackendITCase.java   |   7 +-
 74 files changed, 1173 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 80ae294..72f2f21 100644
--- 
a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ 
b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -941,7 +941,6 @@ public class RollingSinkITCase extends 
StreamingMultipleProgramsTestBase {
                }
        }
 
-
        private static class StreamWriterWithConfigCheck<T> extends 
StringWriter<T> {
                private String key;
                private String expect;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a0efe78..bd8d4dd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         * @param checkpointId  The Id of the checkpoint.
         * @param timestamp     The timestamp of the checkpoint.
         * @param streamFactory The factory that we can use for writing our 
state to streams.
+        * @param checkpointOptions Options for how to perform this checkpoint.
         * @return Future to the state handle of the snapshot data.
         * @throws Exception
         */
@@ -251,7 +253,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        public RunnableFuture<KeyGroupsStateHandle> snapshot(
                        final long checkpointId,
                        final long timestamp,
-                       final CheckpointStreamFactory streamFactory) throws 
Exception {
+                       final CheckpointStreamFactory streamFactory,
+                       CheckpointOptions checkpointOptions) throws Exception {
 
                long startTime = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6b09a8a..3fd5d0f 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -219,6 +219,15 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
        }
 
        @Override
+       public CheckpointStreamFactory createSavepointStreamFactory(
+                       JobID jobId,
+                       String operatorIdentifier,
+                       String targetLocation) throws IOException {
+
+               return 
checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, 
targetLocation);
+       }
+
+       @Override
        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env,
                        JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bce8028..90de7a6 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -186,7 +187,7 @@ public class RocksDBAsyncSnapshotTest {
                        }
                }
 
-               task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+               task.triggerCheckpoint(new CheckpointMetaData(42, 17), 
CheckpointOptions.forFullCheckpoint());
 
                testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 
@@ -266,7 +267,7 @@ public class RocksDBAsyncSnapshotTest {
                        }
                }
 
-               task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+               task.triggerCheckpoint(new CheckpointMetaData(42, 17), 
CheckpointOptions.forFullCheckpoint());
                testHarness.processElement(new StreamRecord<>("Wohoo", 0));
                BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
                task.cancel();
@@ -342,7 +343,8 @@ public class RocksDBAsyncSnapshotTest {
                        StringSerializer.INSTANCE,
                        new ValueStateDescriptor<>("foobar", String.class));
 
-               RunnableFuture<KeyGroupsStateHandle> snapshotFuture = 
keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshotFuture = 
keyedStateBackend.snapshot(
+                       checkpointId, timestamp, checkpointStreamFactory, 
CheckpointOptions.forFullCheckpoint());
 
                try {
                        FutureUtil.runIfNotDoneAndGet(snapshotFuture);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index dc90666..c7b5c20 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -172,7 +173,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        @Test
        public void testRunningSnapshotAfterBackendClosed() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+                       CheckpointOptions.forFullCheckpoint());
 
                RocksDB spyDB = keyedStateBackend.db;
 
@@ -209,7 +211,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        @Test
        public void testReleasingSnapshotAfterBackendClosed() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+                       CheckpointOptions.forFullCheckpoint());
 
                RocksDB spyDB = keyedStateBackend.db;
 
@@ -237,7 +240,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        @Test
        public void testDismissingSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
                snapshot.cancel(true);
                verifyRocksObjectsReleased();
        }
@@ -245,7 +248,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        @Test
        public void testDismissingSnapshotNotRunnable() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
                snapshot.cancel(true);
                Thread asyncSnapshotThread = new Thread(snapshot);
                asyncSnapshotThread.start();
@@ -262,7 +265,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        @Test
        public void testCompletingSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
                Thread asyncSnapshotThread = new Thread(snapshot);
                asyncSnapshotThread.start();
                waiter.await(); // wait for snapshot to run
@@ -282,7 +285,7 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        @Test
        public void testCancelRunningSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+               RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
                Thread asyncSnapshotThread = new Thread(snapshot);
                asyncSnapshotThread.start();
                waiter.await(); // wait for snapshot to run

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 36649ad..c1c65b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -296,15 +298,42 @@ public class CheckpointCoordinator {
                checkNotNull(targetDirectory, "Savepoint target directory");
 
                CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
-               CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory, false);
 
-               if (result.isSuccess()) {
-                       return 
result.getPendingCheckpoint().getCompletionFuture();
-               }
-               else {
-                       Throwable cause = new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message());
-                       return 
FlinkCompletableFuture.completedExceptionally(cause);
+               // Create the unique savepoint directory
+               final String savepointDirectory = SavepointStore
+                       .createSavepointDirectory(targetDirectory, job);
+
+               CheckpointTriggerResult triggerResult = triggerCheckpoint(
+                       timestamp,
+                       props,
+                       savepointDirectory,
+                       false);
+
+               Future<CompletedCheckpoint> result;
+
+               if (triggerResult.isSuccess()) {
+                       result = 
triggerResult.getPendingCheckpoint().getCompletionFuture();
+               } else {
+                       Throwable cause = new Exception("Failed to trigger 
savepoint: " + triggerResult.getFailureReason().message());
+                       result = 
FlinkCompletableFuture.completedExceptionally(cause);
                }
+
+               // Make sure to remove the created base directory on Exceptions
+               result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+                       @Override
+                       public Void apply(Throwable value) {
+                               try {
+                                       
SavepointStore.deleteSavepointDirectory(savepointDirectory);
+                               } catch (Throwable t) {
+                                       LOG.warn("Failed to delete savepoint 
directory " + savepointDirectory
+                                               + " after failed savepoint.", 
t);
+                               }
+
+                               return null;
+                       }
+               }, executor);
+
+               return result;
        }
 
        /**
@@ -517,9 +546,16 @@ public class CheckpointCoordinator {
                                }
                                // end of lock scope
 
+                               CheckpointOptions checkpointOptions;
+                               if (!props.isSavepoint()) {
+                                       checkpointOptions = 
CheckpointOptions.forFullCheckpoint();
+                               } else {
+                                       checkpointOptions = 
CheckpointOptions.forSavepoint(targetDirectory);
+                               }
+
                                // send the messages to the tasks that trigger 
their checkpoint
                                for (Execution execution: executions) {
-                                       
execution.triggerCheckpoint(checkpointID, timestamp);
+                                       
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                                }
 
                                numUnsuccessfulCheckpointsTriggers.set(0);
@@ -756,7 +792,7 @@ public class CheckpointCoordinator {
 
                        triggerQueuedRequests();
                }
-               
+
                // record the time when this was completed, to calculate
                // the 'min delay between checkpoints'
                lastCheckpointCompletionNanos = System.nanoTime();
@@ -1030,7 +1066,7 @@ public class CheckpointCoordinator {
                        final ExecutionAttemptID executionAttemptID,
                        final long checkpointId,
                        final StateObject stateObject) {
-               
+
                if (stateObject != null) {
                        executor.execute(new Runnable() {
                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
new file mode 100644
index 0000000..cb98d10
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+/**
+ * Options for performing the checkpoint.
+ *
+ * <p>The {@link CheckpointProperties} are related and cover properties that
+ * are only relevant at the {@link CheckpointCoordinator}. These options are
+ * relevant at the {@link StatefulTask} instances running on task managers.
+ */
+public class CheckpointOptions implements Serializable {
+
+       private static final long serialVersionUID = 5010126558083292915L;
+
+       /** Type of the checkpoint. */
+       @Nonnull
+       private final CheckpointType checkpointType;
+
+       /** Target location for the checkpoint. */
+       @Nullable
+       private final String targetLocation;
+
+       private CheckpointOptions(
+                       @Nonnull CheckpointType checkpointType,
+                       String targetLocation) {
+               this.checkpointType = checkNotNull(checkpointType);
+               this.targetLocation = targetLocation;
+       }
+
+       /**
+        * Returns the type of checkpoint to perform.
+        *
+        * @return Type of checkpoint to perform.
+        */
+       @Nonnull
+       public CheckpointType getCheckpointType() {
+               return checkpointType;
+       }
+
+       /**
+        * Returns a custom target location or <code>null</code> if none
+        * was specified.
+        *
+        * @return A custom target location or <code>null</code>.
+        */
+       @Nullable
+       public String getTargetLocation() {
+               return targetLocation;
+       }
+
+       @Override
+       public String toString() {
+               return "CheckpointOptions(" + checkpointType + ")";
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final CheckpointOptions FULL_CHECKPOINT = new 
CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null);
+
+       public static CheckpointOptions forFullCheckpoint() {
+               return FULL_CHECKPOINT;
+       }
+
+       public static CheckpointOptions forSavepoint(String targetDirectory) {
+               checkNotNull(targetDirectory, "targetDirectory");
+               return new CheckpointOptions(CheckpointType.SAVEPOINT, 
targetDirectory);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        *  The type of checkpoint to perform.
+        */
+       public enum CheckpointType {
+
+               /** A full checkpoint. */
+               FULL_CHECKPOINT,
+
+               /** A savepoint. */
+               SAVEPOINT;
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 52f2a6a..53d888e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -159,7 +159,7 @@ public class CompletedCheckpoint implements Serializable {
        void discard() throws Exception {
                try {
                        if (externalPath != null) {
-                               SavepointStore.removeSavepoint(externalPath);
+                               
SavepointStore.removeSavepointFile(externalPath);
                        }
 
                        
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9f66314..908ff7f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -214,7 +214,8 @@ public class PendingCheckpoint {
                                        Savepoint savepoint = new 
SavepointV1(checkpointId, taskStates.values());
                                        externalPath = 
SavepointStore.storeSavepoint(
                                                        targetDirectory,
-                                                       savepoint);
+                                                       savepoint
+                                       );
                                } catch (IOException e) {
                                        LOG.error("Failed to persist checkpoint 
{}.",checkpointId, e);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 48cca20..0caf5b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,8 +18,16 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.flink.core.fs.FSDataInputStream;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -28,14 +36,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
- * A file system based savepoint store.
+ * Utilities for storing and loading savepoint meta data files.
  *
  * <p>Stored savepoints have the following format:
  * <pre>
@@ -52,50 +54,84 @@ public class SavepointStore {
        /** Magic number for sanity checks against stored savepoints. */
        public static final int MAGIC_NUMBER = 0x4960672d;
 
-       /** Prefix for savepoint files. */
-       private static final String prefix = "savepoint-";
+       private static final String META_DATA_FILE = "_metadata ";
 
        /**
-        * Stores the savepoint.
+        * Creates a savepoint directory.
         *
-        * @param targetDirectory Target directory to store savepoint in
-        * @param savepoint Savepoint to be stored
-        * @param <T>       Savepoint type
-        * @return Path of stored savepoint
-        * @throws Exception Failures during store are forwarded
+        * @param baseDirectory Base target directory for the savepoint
+        * @param jobId Optional JobID the savepoint belongs to
+        * @return The created savepoint directory
+        * @throws IOException FileSystem operation failures are forwarded
         */
-       public static <T extends Savepoint> String storeSavepoint(
-                       String targetDirectory,
-                       T savepoint) throws IOException {
-
-               checkNotNull(targetDirectory, "Target directory");
-               checkNotNull(savepoint, "Savepoint");
+       public static String createSavepointDirectory(@Nonnull String 
baseDirectory, @Nullable JobID jobId) throws IOException {
+               String prefix;
+               if (jobId == null) {
+                       prefix = "savepoint-";
+               } else {
+                       prefix = String.format("savepoint-%s-", 
jobId.toString().substring(0, 6));
+               }
 
                Exception latestException = null;
-               Path path = null;
-               FSDataOutputStream fdos = null;
+               Path savepointDirectory = null;
 
                FileSystem fs = null;
 
                // Try to create a FS output stream
                for (int attempt = 0; attempt < 10; attempt++) {
-                       path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
+                       Path path = new Path(baseDirectory, 
FileUtils.getRandomFilename(prefix));
 
                        if (fs == null) {
                                fs = FileSystem.get(path.toUri());
                        }
 
                        try {
-                               fdos = fs.create(path, false);
-                               break;
+                               if (fs.mkdirs(path)) {
+                                       savepointDirectory = path;
+                                       break;
+                               }
                        } catch (Exception e) {
                                latestException = e;
                        }
                }
 
-               if (fdos == null) {
-                       throw new IOException("Failed to create file output 
stream at " + path, latestException);
+               if (savepointDirectory == null) {
+                       throw new IOException("Failed to create savepoint 
directory at " + baseDirectory, latestException);
+               } else {
+                       return savepointDirectory.getPath();
                }
+       }
+
+       /**
+        * Deletes a savepoint directory.
+        *
+        * @param savepointDirectory Recursively deletes the given directory
+        * @throws IOException FileSystem operation failures are forwarded
+        */
+       public static void deleteSavepointDirectory(@Nonnull String 
savepointDirectory) throws IOException {
+               Path path = new Path(savepointDirectory);
+               FileSystem fs = FileSystem.get(path.toUri());
+               fs.delete(path, true);
+       }
+
+       /**
+        * Stores the savepoint metadata file.
+        *
+        * @param <T>       Savepoint type
+        * @param directory Target directory to store savepoint in
+        * @param savepoint Savepoint to be stored
+        * @return Path of stored savepoint
+        * @throws Exception Failures during store are forwarded
+        */
+       public static <T extends Savepoint> String storeSavepoint(String 
directory, T savepoint) throws IOException {
+               checkNotNull(directory, "Target directory");
+               checkNotNull(savepoint, "Savepoint");
+
+               Path basePath = new Path(directory);
+               FileSystem fs = FileSystem.get(basePath.toUri());
+
+               Path path = new Path(basePath, META_DATA_FILE);
+               FSDataOutputStream fdos = fs.create(path, false);
 
                boolean success = false;
                try (DataOutputStream dos = new DataOutputStream(fdos)) {
@@ -115,20 +151,41 @@ public class SavepointStore {
                        }
                }
 
-               return path.toString();
+               return basePath.toString();
        }
 
        /**
         * Loads the savepoint at the specified path.
         *
-        * @param path Path of savepoint to load
+        * @param savepointFileOrDirectory Path to the parent savepoint 
directory or the meta data file.
         * @return The loaded savepoint
         * @throws Exception Failures during load are forwared
         */
-       public static Savepoint loadSavepoint(String path, ClassLoader 
userClassLoader) throws IOException {
-               Preconditions.checkNotNull(path, "Path");
+       public static Savepoint loadSavepoint(String savepointFileOrDirectory, 
ClassLoader userClassLoader) throws IOException {
+               Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+
+               Path path = new Path(savepointFileOrDirectory);
+
+               LOG.info("Loading savepoint from {}", path);
 
-               try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+               FileSystem fs = FileSystem.get(path.toUri());
+
+               FileStatus status = fs.getFileStatus(path);
+
+               // If this is a directory, we need to find the meta data file
+               if (status.isDir()) {
+                       Path candidatePath = new Path(path, META_DATA_FILE);
+                       if (fs.exists(candidatePath)) {
+                               path = candidatePath;
+                               LOG.info("Using savepoint file in {}", path);
+                       } else {
+                               throw new IOException("Cannot find meta data 
file in directory " + path
+                                       + ". Please try to load the savepoint 
directly from the meta data file "
+                                       + "instead of the directory.");
+                       }
+               }
+
+               try (DataInputStream dis = new 
DataInputViewStreamWrapper(fs.open(path))) {
                        int magicNumber = dis.readInt();
 
                        if (magicNumber == MAGIC_NUMBER) {
@@ -152,7 +209,7 @@ public class SavepointStore {
         * @param path Path of savepoint to remove
         * @throws Exception Failures during disposal are forwarded
         */
-       public static void removeSavepoint(String path) throws IOException {
+       public static void removeSavepointFile(String path) throws IOException {
                Preconditions.checkNotNull(path, "Path");
 
                try {
@@ -173,14 +230,4 @@ public class SavepointStore {
                }
        }
 
-       private static FSDataInputStream createFsInputStream(Path path) throws 
IOException {
-               FileSystem fs = FileSystem.get(path.toUri());
-
-               if (fs.exists(path)) {
-                       return fs.open(path);
-               } else {
-                       throw new IllegalArgumentException("Invalid path '" + 
path.toUri() + "'.");
-               }
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b3fe443..3191d76 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -675,14 +676,15 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         *
         * @param checkpointId of th checkpoint to trigger
         * @param timestamp of the checkpoint to trigger
+        * @param checkpointOptions of the checkpoint to trigger
         */
-       public void triggerCheckpoint(long checkpointId, long timestamp) {
+       public void triggerCheckpoint(long checkpointId, long timestamp, 
CheckpointOptions checkpointOptions) {
                final SimpleSlot slot = assignedResource;
 
                if (slot != null) {
                        final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
 
-                       taskManagerGateway.triggerCheckpoint(attemptId, 
getVertex().getJobId(), checkpointId, timestamp);
+                       taskManagerGateway.triggerCheckpoint(attemptId, 
getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
                } else {
                        LOG.debug("The execution has no slot assigned. This 
indicates that the execution is " +
                                "no longer running.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 59f56b0..0752897 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.runtime.io.network.api;
 
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
@@ -43,12 +48,14 @@ public class CheckpointBarrier extends RuntimeEvent {
 
        private long id;
        private long timestamp;
+       private CheckpointOptions checkpointOptions;
 
        public CheckpointBarrier() {}
 
-       public CheckpointBarrier(long id, long timestamp) {
+       public CheckpointBarrier(long id, long timestamp, CheckpointOptions 
checkpointOptions) {
                this.id = id;
                this.timestamp = timestamp;
+               this.checkpointOptions = checkNotNull(checkpointOptions);
        }
 
        public long getId() {
@@ -59,20 +66,53 @@ public class CheckpointBarrier extends RuntimeEvent {
                return timestamp;
        }
 
+       public CheckpointOptions getCheckpointOptions() {
+               return checkpointOptions;
+       }
+
+       // 
------------------------------------------------------------------------
+       // Serialization
        // 
------------------------------------------------------------------------
        
        @Override
        public void write(DataOutputView out) throws IOException {
                out.writeLong(id);
                out.writeLong(timestamp);
+               CheckpointType checkpointType = 
checkpointOptions.getCheckpointType();
+
+               out.writeInt(checkpointType.ordinal());
+
+               if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+                       return;
+               } else if (checkpointType == CheckpointType.SAVEPOINT) {
+                       String targetLocation = 
checkpointOptions.getTargetLocation();
+                       assert(targetLocation != null);
+                       out.writeUTF(targetLocation);
+               } else {
+                       throw new IOException("Unknown CheckpointType " + 
checkpointType);
+               }
        }
 
        @Override
        public void read(DataInputView in) throws IOException {
                id = in.readLong();
                timestamp = in.readLong();
+
+               int typeOrdinal = in.readInt();
+               checkElementIndex(typeOrdinal, CheckpointType.values().length, 
"Unknown CheckpointType ordinal " + typeOrdinal);
+               CheckpointType checkpointType = 
CheckpointType.values()[typeOrdinal];
+
+               if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+                       checkpointOptions = 
CheckpointOptions.forFullCheckpoint();
+               } else if (checkpointType == CheckpointType.SAVEPOINT) {
+                       String targetLocation = in.readUTF();
+                       checkpointOptions = 
CheckpointOptions.forSavepoint(targetLocation);
+               } else {
+                       throw new IOException("Illegal CheckpointType " + 
checkpointType);
+               }
        }
-       
+
+
        // 
------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 4d9f431..223cbfe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import java.nio.charset.Charset;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +37,7 @@ import org.apache.flink.util.InstantiationUtil;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Utility class to serialize and deserialize task events.
@@ -60,10 +64,34 @@ public class EventSerializer {
                else if (eventClass == CheckpointBarrier.class) {
                        CheckpointBarrier barrier = (CheckpointBarrier) event;
 
-                       ByteBuffer buf = ByteBuffer.allocate(20);
-                       buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
-                       buf.putLong(4, barrier.getId());
-                       buf.putLong(12, barrier.getTimestamp());
+                       CheckpointOptions checkpointOptions = 
barrier.getCheckpointOptions();
+                       CheckpointType checkpointType = 
checkpointOptions.getCheckpointType();
+
+                       ByteBuffer buf;
+                       if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+                               buf = ByteBuffer.allocate(24);
+                               buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+                               buf.putLong(4, barrier.getId());
+                               buf.putLong(12, barrier.getTimestamp());
+                               buf.putInt(20, checkpointType.ordinal());
+                       } else if (checkpointType == CheckpointType.SAVEPOINT) {
+                               String targetLocation = 
checkpointOptions.getTargetLocation();
+                               assert(targetLocation != null);
+                               byte[] bytes = 
targetLocation.getBytes(Charset.forName("UTF-8"));
+
+                               buf = ByteBuffer.allocate(24 + 4 + 
bytes.length);
+                               buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+                               buf.putLong(4, barrier.getId());
+                               buf.putLong(12, barrier.getTimestamp());
+                               buf.putInt(20, checkpointType.ordinal());
+                               buf.putInt(24, bytes.length);
+                               for (int i = 0; i < bytes.length; i++) {
+                                       buf.put(28 + i, bytes[i]);
+                               }
+                       } else {
+                               throw new IOException("Unknown checkpoint type: 
" + checkpointType);
+                       }
+
                        return buf;
                }
                else if (eventClass == EndOfSuperstepEvent.class) {
@@ -172,7 +200,28 @@ public class EventSerializer {
                        else if (type == CHECKPOINT_BARRIER_EVENT) {
                                long id = buffer.getLong();
                                long timestamp = buffer.getLong();
-                               return new CheckpointBarrier(id, timestamp);
+
+                               CheckpointOptions checkpointOptions;
+
+                               int checkpointTypeOrdinal = buffer.getInt();
+                               Preconditions.checkElementIndex(type, 
CheckpointType.values().length,
+                                       "Illegal CheckpointType ordinal " + 
checkpointTypeOrdinal);
+                               CheckpointType checkpointType = 
CheckpointType.values()[checkpointTypeOrdinal];
+
+                               if (checkpointType == 
CheckpointType.FULL_CHECKPOINT) {
+                                       checkpointOptions = 
CheckpointOptions.forFullCheckpoint();
+                               } else if (checkpointType == 
CheckpointType.SAVEPOINT) {
+                                       int len = buffer.getInt();
+                                       byte[] bytes = new byte[len];
+                                       buffer.get(bytes);
+                                       String targetLocation = new 
String(bytes, Charset.forName("UTF-8"));
+
+                                       checkpointOptions = 
CheckpointOptions.forSavepoint(targetLocation);
+                               } else {
+                                       throw new IOException("Unknown 
checkpoint type: " + checkpointType);
+                               }
+
+                               return new CheckpointBarrier(id, timestamp, 
checkpointOptions);
                        }
                        else if (type == END_OF_SUPERSTEP_EVENT) {
                                return EndOfSuperstepEvent.INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 87b66ce..0930011 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
 /**
@@ -46,21 +47,23 @@ public interface StatefulTask {
         * method.
         *
         * @param checkpointMetaData Meta data for about this checkpoint
+        * @param checkpointOptions Options for performing this checkpoint
         *
         * @return {@code false} if the checkpoint can not be carried out, 
{@code true} otherwise
         */
-       boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws 
Exception;
+       boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, 
CheckpointOptions checkpointOptions) throws Exception;
 
        /**
         * This method is called when a checkpoint is triggered as a result of 
receiving checkpoint
         * barriers on all input streams.
         * 
         * @param checkpointMetaData Meta data for about this checkpoint
+        * @param checkpointOptions Options for performing this checkpoint
         * @param checkpointMetrics Metrics about this checkpoint
         * 
         * @throws Exception Exceptions thrown as the result of triggering a 
checkpoint are forwarded.
         */
-       void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, 
CheckpointMetrics checkpointMetrics) throws Exception;
+       void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, 
CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) 
throws Exception;
 
        /**
         * Aborts a checkpoint as the result of receiving possibly some 
checkpoint barriers,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index fe4ecfb..2876ebe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.concurrent.Future;
@@ -196,12 +197,13 @@ public class ActorTaskManagerGateway implements 
TaskManagerGateway {
                        ExecutionAttemptID executionAttemptID,
                        JobID jobId,
                        long checkpointId,
-                       long timestamp) {
+                       long timestamp,
+                       CheckpointOptions checkpointOptions) {
 
                Preconditions.checkNotNull(executionAttemptID);
                Preconditions.checkNotNull(jobId);
 
-               actorGateway.tell(new TriggerCheckpoint(jobId, 
executionAttemptID, checkpointId, timestamp));
+               actorGateway.tell(new TriggerCheckpoint(jobId, 
executionAttemptID, checkpointId, timestamp, checkpointOptions));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index db0a3bf..09f104f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -160,12 +161,14 @@ public interface TaskManagerGateway {
         * @param jobId identifying the job to which the task belongs
         * @param checkpointId of the checkpoint to trigger
         * @param timestamp of the checkpoint to trigger
+        * @param checkpointOptions of the checkpoint to trigger
         */
        void triggerCheckpoint(
                ExecutionAttemptID executionAttemptID,
                JobID jobId,
                long checkpointId,
-               long timestamp);
+               long timestamp,
+               CheckpointOptions checkpointOptions);
 
        /**
         * Request the task manager log from the task manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index eba97d2..28fef27 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -123,7 +124,7 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, 
JobID jobId, long checkpointId, long timestamp) {
+       public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, 
JobID jobId, long checkpointId, long timestamp, CheckpointOptions 
checkpointOptions) {
 //             taskExecutorGateway.triggerCheckpoint(executionAttemptID, 
jobId, checkpointId, timestamp);
                throw new UnsupportedOperationException("Operation is not yet 
supported.");
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
index 0528755..3477e13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.messages.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 /**
@@ -33,9 +36,19 @@ public class TriggerCheckpoint extends 
AbstractCheckpointMessage implements java
        /** The timestamp associated with the checkpoint */
        private final long timestamp;
 
-       public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, 
long checkpointId, long timestamp) {
+       /** Options for how to perform the checkpoint. */
+       private final CheckpointOptions checkpointOptions;
+
+       public TriggerCheckpoint(
+                       JobID job,
+                       ExecutionAttemptID taskExecutionId,
+                       long checkpointId,
+                       long timestamp,
+                       CheckpointOptions checkpointOptions) {
+
                super(job, taskExecutionId, checkpointId);
                this.timestamp = timestamp;
+               this.checkpointOptions = checkNotNull(checkpointOptions);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -44,6 +57,10 @@ public class TriggerCheckpoint extends 
AbstractCheckpointMessage implements java
                return timestamp;
        }
 
+       public CheckpointOptions getCheckpointOptions() {
+               return checkpointOptions;
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 3ed49f1..14f897f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -54,7 +55,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base implementation of KeyedStateBackend. The state can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory, 
CheckpointOptions)}.
  *
  * @param <K> Type of the key by which state is keyed.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index bc4594a..a335e45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -31,6 +32,7 @@ import java.io.IOException;
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, 
java.io.Serializable {
+
        private static final long serialVersionUID = 4620415814639230247L;
 
        @Override
@@ -39,6 +41,12 @@ public abstract class AbstractStateBackend implements 
StateBackend, java.io.Seri
                        String operatorIdentifier) throws IOException;
 
        @Override
+       public abstract CheckpointStreamFactory createSavepointStreamFactory(
+                       JobID jobId,
+                       String operatorIdentifier,
+                       @Nullable String targetLocation) throws IOException;
+
+       @Override
        public abstract <K> AbstractKeyedStateBackend<K> 
createKeyedStateBackend(
                        Environment env,
                        JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index adf0727..8dcf49e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -154,7 +155,10 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
        @Override
        public RunnableFuture<OperatorStateHandle> snapshot(
-                       long checkpointId, long timestamp, 
CheckpointStreamFactory streamFactory) throws Exception {
+                       long checkpointId,
+                       long timestamp,
+                       CheckpointStreamFactory streamFactory,
+                       CheckpointOptions checkpointOptions) throws Exception {
 
                if (registeredStates.isEmpty()) {
                        return new DoneFuture<>(null);
@@ -346,4 +350,4 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
                        return partitionOffsets;
                }
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..0d92b46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
 /**
  * Interface for operations that can perform snapshots of their state.
@@ -37,12 +38,14 @@ public interface Snapshotable<S extends StateObject> {
         * @param checkpointId  The ID of the checkpoint.
         * @param timestamp     The timestamp of the checkpoint.
         * @param streamFactory The factory that we can use for writing our 
state to streams.
+        * @param checkpointOptions Options for how to perform this checkpoint.
         * @return A runnable future that will yield a {@link StateObject}.
         */
        RunnableFuture<S> snapshot(
                        long checkpointId,
                        long timestamp,
-                       CheckpointStreamFactory streamFactory) throws Exception;
+                       CheckpointStreamFactory streamFactory,
+                       CheckpointOptions checkpointOptions) throws Exception;
 
        /**
         * Restores state that was previously snapshotted from the provided 
parameters. Typically the parameters are state

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 846df89..7961b5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -95,6 +96,27 @@ public interface StateBackend extends java.io.Serializable {
         */
        CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException;
 
+       /**
+        * Creates a {@link CheckpointStreamFactory} that can be used to create 
streams
+        * that should end up in a savepoint.
+        *
+        * <p>This is only called if the triggered checkpoint is a savepoint. 
Commonly
+        * this will return the same factory as for regular checkpoints, but 
maybe
+        * slightly adjusted.
+        *
+        * @param jobId The {@link JobID} of the job for which we are creating 
checkpoint streams.
+        * @param operatorIdentifier An identifier of the operator for which we 
create streams.
+        * @param targetLocation An optional custom location for the savepoint 
stream.
+        * 
+        * @return The stream factory for savepoints.
+        * 
+        * @throws IOException Failures during stream creation are forwarded.
+        */
+       CheckpointStreamFactory createSavepointStreamFactory(
+                       JobID jobId,
+                       String operatorIdentifier,
+                       @Nullable String targetLocation) throws IOException;
+
        // 
------------------------------------------------------------------------
        //  Structure Backends 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 30b1da6..8455d84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -94,18 +94,15 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                                MAX_FILE_STATE_THRESHOLD);
                }
                this.fileStateThreshold = fileStateSizeThreshold;
+
                Path basePath = checkpointDataUri;
+               filesystem = basePath.getFileSystem();
 
-               Path dir = new Path(basePath, jobId.toString());
+               checkpointDirectory = createBasePath(filesystem, basePath, 
jobId);
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Initializing file stream factory to URI 
{}.", dir);
+                       LOG.debug("Initialed file stream factory to URI {}.", 
checkpointDirectory);
                }
-
-               filesystem = basePath.getFileSystem();
-               filesystem.mkdirs(dir);
-
-               checkpointDirectory = dir;
        }
 
        @Override
@@ -115,7 +112,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
        public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
                checkFileSystemInitialized();
 
-               Path checkpointDir = createCheckpointDirPath(checkpointID);
+               Path checkpointDir = 
createCheckpointDirPath(checkpointDirectory, checkpointID);
                int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
                return new FsCheckpointStateOutputStream(checkpointDir, 
filesystem, bufferSize, fileStateThreshold);
        }
@@ -130,7 +127,13 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
                }
        }
 
-       private Path createCheckpointDirPath(long checkpointID) {
+       protected Path createBasePath(FileSystem fs, Path checkpointDirectory, 
JobID jobID) throws IOException {
+               Path dir = new Path(checkpointDirectory, jobID.toString());
+               fs.mkdirs(dir);
+               return dir;
+       }
+
+       protected Path createCheckpointDirPath(Path checkpointDirectory, long 
checkpointID) {
                return new Path(checkpointDirectory, "chk-" + checkpointID);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
new file mode 100644
index 0000000..7410d2d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import java.io.IOException;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+
+/**
+ * A {@link CheckpointStreamFactory} that produces streams that write to a
+ * {@link FileSystem}.
+ *
+ * <p>The difference to the parent {@link FsCheckpointStreamFactory} is only
+ * in the created directory layout. All checkpoint files go to the checkpoint
+ * directory.
+ */
+public class FsSavepointStreamFactory extends FsCheckpointStreamFactory {
+
+       public FsSavepointStreamFactory(
+                       Path checkpointDataUri,
+                       JobID jobId,
+                       int fileStateSizeThreshold) throws IOException {
+
+               super(checkpointDataUri, jobId, fileStateSizeThreshold);
+       }
+
+       @Override
+       protected Path createBasePath(FileSystem fs, Path checkpointDirectory, 
JobID jobID) throws IOException {
+               // No checkpoint specific directory required as the savepoint 
directory
+               // is already unique.
+               return checkpointDirectory;
+       }
+
+       @Override
+       protected Path createCheckpointDirPath(Path checkpointDirectory, long 
checkpointID) {
+               // No checkpoint specific directory required as the savepoint 
directory
+               // is already unique.
+               return checkpointDirectory;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 281dbb0..b614d98 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -173,6 +173,15 @@ public class FsStateBackend extends AbstractStateBackend {
        }
 
        @Override
+       public CheckpointStreamFactory createSavepointStreamFactory(
+                       JobID jobId,
+                       String operatorIdentifier,
+                       String targetLocation) throws IOException {
+
+               return new FsSavepointStreamFactory(new Path(targetLocation), 
jobId, fileStateThreshold);
+       }
+
+       @Override
        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env,
                        JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 04e4fbc..4a5455a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import 
org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
 import 
org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -215,7 +216,8 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        public RunnableFuture<KeyGroupsStateHandle> snapshot(
                        long checkpointId,
                        long timestamp,
-                       CheckpointStreamFactory streamFactory) throws Exception 
{
+                       CheckpointStreamFactory streamFactory,
+                       CheckpointOptions checkpointOptions) throws Exception {
 
                if (stateTables.isEmpty()) {
                        return new DoneFuture<>(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 58a86df..2cc1164 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -75,6 +75,15 @@ public class MemoryStateBackend extends AbstractStateBackend 
{
        }
 
        @Override
+       public CheckpointStreamFactory createSavepointStreamFactory(
+                       JobID jobId,
+                       String operatorIdentifier,
+                       String targetLocation) throws IOException {
+
+               return new MemCheckpointStreamFactory(maxStateSize);
+       }
+
+       @Override
        public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
                        Environment env, JobID jobID,
                        String operatorIdentifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 2980376..8db1d5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -475,13 +476,13 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // 
----------------------------------------------------------------------
 
        @RpcMethod
-       public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
+       public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp, 
CheckpointOptions checkpointOptions) throws CheckpointException {
                log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
 
                final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
-                       task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp);
+                       task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp, checkpointOptions);
 
                        return Acknowledge.get();
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ebd4c0c..36a3255 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
@@ -97,9 +98,10 @@ public interface TaskExecutorGateway extends RpcGateway {
         * @param executionAttemptID identifying the task
         * @param checkpointID unique id for the checkpoint
         * @param checkpointTimestamp is the timestamp when the checkpoint has 
been initiated
+        * @param checkpointOptions for performing the checkpoint
         * @return Future acknowledge if the checkpoint has been successfully 
triggered
         */
-       Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointID, long checkpointTimestamp);
+       Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointID, long checkpointTimestamp, 
CheckpointOptions checkpointOptions);
 
        /**
         * Confirm a checkpoint for the given task. The checkpoint is 
identified by the checkpoint ID

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index acb423b..c9f17b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -1117,8 +1118,13 @@ public class Task implements Runnable, TaskActions {
         * 
         * @param checkpointID The ID identifying the checkpoint.
         * @param checkpointTimestamp The timestamp associated with the 
checkpoint.
+        * @param checkpointOptions Options for performing this checkpoint.
         */
-       public void triggerCheckpointBarrier(final long checkpointID, long 
checkpointTimestamp) {
+       public void triggerCheckpointBarrier(
+                       final long checkpointID,
+                       long checkpointTimestamp,
+                       final CheckpointOptions checkpointOptions) {
+
                final AbstractInvokable invokable = this.invokable;
                final CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointID, checkpointTimestamp);
 
@@ -1134,7 +1140,7 @@ public class Task implements Runnable, TaskActions {
                                                // activate safety net for 
checkpointing thread
                                                
FileSystemSafetyNet.initializeSafetyNetForThread();
                                                try {
-                                                       boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData);
+                                                       boolean success = 
statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                                                        if (!success) {
                                                                
checkpointResponder.declineCheckpoint(
                                                                                
getJobID(), getExecutionId(), checkpointID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8b08181..21749cb 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -837,7 +837,7 @@ class JobManager(
           savepoint.dispose()
 
           // Remove the header file
-          SavepointStore.removeSavepoint(savepointPath)
+          SavepointStore.removeSavepointFile(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a70454b..25d5366 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -501,12 +501,13 @@ class TaskManager(
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
+        val checkpointOptions = message.getCheckpointOptions
 
         log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for 
$taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
-          task.triggerCheckpointBarrier(checkpointId, timestamp)
+          task.triggerCheckpointBarrier(checkpointId, timestamp, 
checkpointOptions)
         } else {
           log.debug(s"TaskManager received a checkpoint request for unknown 
task $taskExecutionId.")
         }

Reply via email to