Repository: flink
Updated Branches:
  refs/heads/master c7d1a3b8d -> fd410d9f6


http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index cdeef94..0a7f65e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Configuration that captures all checkpointing related settings.
@@ -64,6 +67,9 @@ public class CheckpointConfig implements java.io.Serializable 
{
        /** Flag to force checkpointing in iterative jobs */
        private boolean forceCheckpointing;
 
+       /** Cleanup behaviour for persistent checkpoints. */
+       private ExternalizedCheckpointCleanup externalizedCheckpointCleanup;
+
        // 
------------------------------------------------------------------------
 
        /**
@@ -223,4 +229,99 @@ public class CheckpointConfig implements 
java.io.Serializable {
        public void setForceCheckpointing(boolean forceCheckpointing) {
                this.forceCheckpointing = forceCheckpointing;
        }
+
+       /**
+        * Enables checkpoints to be persisted externally.
+        *
+        * <p>Externalized checkpoints write their meta data out to persistent
+        * storage and are <strong>not</strong> automatically cleaned up when
+        * the owning job fails (terminating with job status {@link 
JobStatus#FAILED}).
+        * In this case, you have to manually clean up the checkpoint state, 
both
+        * the meta data and actual program state.
+        *
+        * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an
+        * externalized checkpoint should be cleaned up on job cancellation. If 
you
+        * choose to retain externalized checkpoints on cancellation you have 
you
+        * handle checkpoint clean up manually when you cancel the job as well
+        * (terminating with job status {@link JobStatus#CANCELED}).
+        *
+        * <p>The target directory for externalized checkpoints is configured
+        * via {@link ConfigConstants#CHECKPOINTS_DIRECTORY_KEY}.
+        *
+        * @param cleanupMode Externalized checkpoint cleanup behaviour.
+        */
+       @PublicEvolving
+       public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup 
cleanupMode) {
+               this.externalizedCheckpointCleanup = checkNotNull(cleanupMode);
+       }
+
+       /**
+        * Returns whether checkpoints should be persisted externally.
+        *
+        * @return <code>true</code> if checkpoints should be externalized.
+        */
+       @PublicEvolving
+       public boolean isExternalizedCheckpointsEnabled() {
+               return externalizedCheckpointCleanup != null;
+       }
+
+       /**
+        * Returns the cleanup behaviour for externalized checkpoints.
+        *
+        * @return The cleanup behaviour for externalized checkpoints or
+        * <code>null</code> if none is configured.
+        */
+       @PublicEvolving
+       public ExternalizedCheckpointCleanup getExternalizedCheckpointCleanup() 
{
+               return externalizedCheckpointCleanup;
+       }
+
+       /**
+        * Cleanup behaviour for externalized checkpoints when the job is 
cancelled.
+        */
+       @PublicEvolving
+       public enum ExternalizedCheckpointCleanup {
+
+               /**
+                * Delete externalized checkpoints on job cancellation.
+                *
+                * <p>All checkpoint state will be deleted when you cancel the 
owning
+                * job, both the meta data and actual program state. Therefore, 
you
+                * cannot resume from externalized checkpoints after the job 
has been
+                * cancelled.
+                *
+                * <p>Note that checkpoint state is always kept if the job 
terminates
+                * with state {@link JobStatus#FAILED}.
+                */
+               DELETE_ON_CANCELLATION(true),
+
+               /**
+                * Retain externalized checkpoints on job cancellation.
+                *
+                * <p>All checkpoint state is kept when you cancel the owning 
job. You
+                * have to manually delete both the checkpoint meta data and 
actual
+                * program state after cancelling the job.
+                *
+                * <p>Note that checkpoint state is always kept if the job 
terminates
+                * with state {@link JobStatus#FAILED}.
+                */
+               RETAIN_ON_CANCELLATION(false);
+
+               private final boolean deleteOnCancellation;
+
+               ExternalizedCheckpointCleanup(boolean deleteOnCancellation) {
+                       this.deleteOnCancellation = deleteOnCancellation;
+               }
+
+               /**
+                * Returns whether persistent checkpoints shall be discarded on
+                * cancellation of the job.
+                *
+                * @return <code>true</code> if persistent checkpoints shall be 
discarded
+                * on cancellation of the job.
+                */
+               public boolean deleteOnCancellation() {
+                       return deleteOnCancellation;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 8fc2872..e1cfc60 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -89,7 +89,7 @@ import java.util.List;
  * The StreamExecutionEnvironment is the context in which a streaming program 
is executed. A
  * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
  * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
- * 
+ *
  * <p>The environment provides methods to control the job execution (such as 
setting the parallelism
  * or the fault tolerance/checkpointing parameters) and to interact with the 
outside world (data access).
  *
@@ -101,7 +101,7 @@ public abstract class StreamExecutionEnvironment {
 
        /** The default name to use for a streaming job if no other name has 
been specified */
        public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
-       
+
        /** The time characteristic that is used if none other is set */
        private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = 
TimeCharacteristic.ProcessingTime;
 
@@ -113,28 +113,28 @@ public abstract class StreamExecutionEnvironment {
 
        /** The default parallelism used when creating a local environment */
        private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
-       
+
        // 
------------------------------------------------------------------------
 
        /** The execution configuration for this environment */
        private final ExecutionConfig config = new ExecutionConfig();
-       
-       /** Settings that control the checkpointing behavior */ 
+
+       /** Settings that control the checkpointing behavior */
        private final CheckpointConfig checkpointCfg = new CheckpointConfig();
-       
+
        protected final List<StreamTransformation<?>> transformations = new 
ArrayList<>();
-       
+
        private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
-       
+
        protected boolean isChainingEnabled = true;
-       
+
        /** The state backend used for storing k/v state and state snapshots */
        private AbstractStateBackend defaultStateBackend;
-       
+
        /** The time characteristic used by the data streams */
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
 
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
@@ -274,7 +274,7 @@ public abstract class StreamExecutionEnvironment {
        /**
         * Gets the checkpoint config, which defines values like checkpoint 
interval, delay between
         * checkpoints, etc.
-        * 
+        *
         * @return The checkpoint config.
         */
        public CheckpointConfig getCheckpointConfig() {
@@ -286,13 +286,13 @@ public abstract class StreamExecutionEnvironment {
         * dataflow will be periodically snapshotted. In case of a failure, the 
streaming
         * dataflow will be restarted from the latest completed checkpoint. 
This method selects
         * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
-        * 
+        *
         * <p>The job draws checkpoints periodically, in the given interval. 
The state will be
         * stored in the configured state backend.</p>
-        * 
+        *
         * <p>NOTE: Checkpointing iterative streaming dataflows in not properly 
supported at
         * the moment. For that reason, iterative jobs will not be started if 
used
-        * with enabled checkpointing. To override this mechanism, use the 
+        * with enabled checkpointing. To override this mechanism, use the
         * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} 
method.</p>
         *
         * @param interval Time interval between state checkpoints in 
milliseconds.
@@ -313,12 +313,12 @@ public abstract class StreamExecutionEnvironment {
         *
         * <p>NOTE: Checkpointing iterative streaming dataflows in not properly 
supported at
         * the moment. For that reason, iterative jobs will not be started if 
used
-        * with enabled checkpointing. To override this mechanism, use the 
+        * with enabled checkpointing. To override this mechanism, use the
         * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} 
method.</p>
         *
-        * @param interval 
+        * @param interval
         *             Time interval between state checkpoints in milliseconds.
-        * @param mode 
+        * @param mode
         *             The checkpointing mode, selecting between "exactly once" 
and "at least once" guaranteed.
         */
        public StreamExecutionEnvironment enableCheckpointing(long interval, 
CheckpointingMode mode) {
@@ -326,7 +326,7 @@ public abstract class StreamExecutionEnvironment {
                checkpointCfg.setCheckpointInterval(interval);
                return this;
        }
-       
+
        /**
         * Enables checkpointing for the streaming job. The distributed state 
of the streaming
         * dataflow will be periodically snapshotted. In case of a failure, the 
streaming
@@ -338,7 +338,7 @@ public abstract class StreamExecutionEnvironment {
         * <p>NOTE: Checkpointing iterative streaming dataflows in not properly 
supported at
         * the moment. If the "force" parameter is set to true, the system will 
execute the
         * job nonetheless.</p>
-        * 
+        *
         * @param interval
         *            Time interval between state checkpoints in millis.
         * @param mode
@@ -367,9 +367,9 @@ public abstract class StreamExecutionEnvironment {
         *
         * <p>NOTE: Checkpointing iterative streaming dataflows in not properly 
supported at
         * the moment. For that reason, iterative jobs will not be started if 
used
-        * with enabled checkpointing. To override this mechanism, use the 
+        * with enabled checkpointing. To override this mechanism, use the
         * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} 
method.</p>
-        * 
+        *
         * @deprecated Use {@link #enableCheckpointing(long)} instead.
         */
        @Deprecated
@@ -381,7 +381,7 @@ public abstract class StreamExecutionEnvironment {
 
        /**
         * Returns the checkpointing interval or -1 if checkpointing is 
disabled.
-        * 
+        *
         * <p>Shorthand for {@code 
getCheckpointConfig().getCheckpointInterval()}.
         *
         * @return The checkpointing interval or -1
@@ -402,9 +402,9 @@ public abstract class StreamExecutionEnvironment {
 
        /**
         * Returns the checkpointing mode (exactly-once vs. at-least-once).
-        * 
+        *
         * <p>Shorthand for {@code 
getCheckpointConfig().getCheckpointingMode()}.
-        * 
+        *
         * @return The checkpoin
         */
        public CheckpointingMode getCheckpointingMode() {
@@ -422,7 +422,7 @@ public abstract class StreamExecutionEnvironment {
         * <p>The {@link 
org.apache.flink.runtime.state.memory.MemoryStateBackend} for example
         * maintains the state in heap memory, as objects. It is lightweight 
without extra dependencies,
         * but can checkpoint only small states (some counters).
-        * 
+        *
         * <p>In contrast, the {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
         * stores checkpoints of the state (also maintained as heap objects) in 
files. When using a replicated
         * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will 
guarantee that state is not lost upon
@@ -430,7 +430,7 @@ public abstract class StreamExecutionEnvironment {
         * consistent (assuming that Flink is run in high-availability mode).
         *
         * @return This StreamExecutionEnvironment itself, to allow chaining of 
function calls.
-        * 
+        *
         * @see #getStateBackend()
         */
        @PublicEvolving
@@ -442,7 +442,7 @@ public abstract class StreamExecutionEnvironment {
        /**
         * Returns the state backend that defines how to store and checkpoint 
state.
         * @return The state backend that defines how to store and checkpoint 
state.
-        * 
+        *
         * @see #setStateBackend(AbstractStateBackend)
         */
        @PublicEvolving
@@ -547,8 +547,7 @@ public abstract class StreamExecutionEnvironment {
         * @param serializerClass
         *              The class of the serializer to use.
         */
-       public void addDefaultKryoSerializer(Class<?> type,
-                       Class<? extends Serializer<?>> serializerClass) {
+       public void addDefaultKryoSerializer(Class<?> type, Class<? extends 
Serializer<?>> serializerClass) {
                config.addDefaultKryoSerializer(type, serializerClass);
        }
 
@@ -617,7 +616,7 @@ public abstract class StreamExecutionEnvironment {
         * If you set the characteristic to IngestionTime of EventTime this 
will set a default
         * watermark update interval of 200 ms. If this is not applicable for 
your application
         * you should change it using {@link 
ExecutionConfig#setAutoWatermarkInterval(long)}.
-        * 
+        *
         * @param characteristic The time characteristic.
         */
        @PublicEvolving
@@ -641,7 +640,7 @@ public abstract class StreamExecutionEnvironment {
        public TimeCharacteristic getStreamTimeCharacteristic() {
                return timeCharacteristic;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Data stream creations
        // 
--------------------------------------------------------------------------------------------
@@ -694,15 +693,15 @@ public abstract class StreamExecutionEnvironment {
                }
                catch (Exception e) {
                        throw new RuntimeException("Could not create 
TypeInformation for type " + data[0].getClass().getName()
-                               + "; please specify the TypeInformation 
manually via "
-                               + 
"StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+                                       + "; please specify the TypeInformation 
manually via "
+                                       + 
"StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
                }
                return fromCollection(Arrays.asList(data), typeInfo);
        }
 
        /**
-        * Creates a new data set that contains the given elements. The 
framework will determine the type according to the 
-        * based type user supplied. The elements should be the same or be the 
subclass to the based type. 
+        * Creates a new data set that contains the given elements. The 
framework will determine the type according to the
+        * based type user supplied. The elements should be the same or be the 
subclass to the based type.
         * The sequence of elements must not be empty.
         * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
         * degree of parallelism one.
@@ -776,7 +775,7 @@ public abstract class StreamExecutionEnvironment {
 
        /**
         * Creates a data stream from the given non-empty collection.
-        * 
+        *
         * <p>Note that this operation will result in a non-parallel data 
stream source,
         * i.e., a data stream source with a parallelism one.</p>
         *
@@ -790,7 +789,7 @@ public abstract class StreamExecutionEnvironment {
         */
        public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, 
TypeInformation<OUT> typeInfo) {
                Preconditions.checkNotNull(data, "Collection must not be null");
-               
+
                // must not have null elements and mixed elements
                FromElementsFunction.checkCollection(data, 
typeInfo.getTypeClass());
 
@@ -806,11 +805,11 @@ public abstract class StreamExecutionEnvironment {
 
        /**
         * Creates a data stream from the given iterator.
-        * 
+        *
         * <p>Because the iterator will remain unmodified until the actual 
execution happens,
         * the type of data returned by the iterator must be given explicitly 
in the form of the type
         * class (this is due to the fact that the Java compiler erases the 
generic type information).</p>
-        * 
+        *
         * <p>Note that this operation will result in a non-parallel data 
stream source, i.e.,
         * a data stream source with a parallelism of one.</p>
         *
@@ -829,13 +828,13 @@ public abstract class StreamExecutionEnvironment {
 
        /**
         * Creates a data stream from the given iterator.
-        * 
+        *
         * <p>Because the iterator will remain unmodified until the actual 
execution happens,
         * the type of data returned by the iterator must be given explicitly 
in the form of the type
         * information. This method is useful for cases where the type is 
generic.
         * In that case, the type class (as given in
         * {@link #fromCollection(java.util.Iterator, Class)} does not supply 
all type information.</p>
-        * 
+        *
         * <p>Note that this operation will result in a non-parallel data 
stream source, i.e.,
         * a data stream source with a parallelism one.</p>
         *
@@ -857,7 +856,7 @@ public abstract class StreamExecutionEnvironment {
        /**
         * Creates a new data stream that contains elements in the iterator. 
The iterator is splittable, allowing the
         * framework to create a parallel data stream source that returns the 
elements in the iterator.
-        * 
+        *
         * <p>Because the iterator will remain unmodified until the actual 
execution happens, the type of data returned by the
         * iterator must be given explicitly in the form of the type class 
(this is due to the fact that the Java compiler
         * erases the generic type information).</p>
@@ -1020,8 +1019,8 @@ public abstract class StreamExecutionEnvironment {
                        typeInformation = 
TypeExtractor.getInputFormatTypes(inputFormat);
                } catch (Exception e) {
                        throw new InvalidProgramException("The type returned by 
the input format could not be " +
-                               "automatically determined. Please specify the 
TypeInformation of the produced type " +
-                               "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
+                                       "automatically determined. Please 
specify the TypeInformation of the produced type " +
+                                       "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
                }
                return readFile(inputFormat, filePath, watchType, interval, 
typeInformation);
        }
@@ -1072,8 +1071,8 @@ public abstract class StreamExecutionEnvironment {
                        typeInformation = 
TypeExtractor.getInputFormatTypes(inputFormat);
                } catch (Exception e) {
                        throw new InvalidProgramException("The type returned by 
the input format could not be " +
-                               "automatically determined. Please specify the 
TypeInformation of the produced type " +
-                               "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
+                                       "automatically determined. Please 
specify the TypeInformation of the produced type " +
+                                       "explicitly by using the 
'createInput(InputFormat, TypeInformation)' method instead.");
                }
                return readFile(inputFormat, filePath, watchType, interval, 
typeInformation);
        }
@@ -1094,15 +1093,14 @@ public abstract class StreamExecutionEnvironment {
         *              contents
         *              of files.
         * @return The DataStream containing the given directory.
-        * 
+        *
         * @deprecated Use {@link #readFile(FileInputFormat, String, 
FileProcessingMode, long)} instead.
         */
        @Deprecated
        @SuppressWarnings("deprecation")
-       public DataStream<String> readFileStream(String filePath, long 
intervalMillis,
-                                                                               
        FileMonitoringFunction.WatchType watchType) {
+       public DataStream<String> readFileStream(String filePath, long 
intervalMillis, FileMonitoringFunction.WatchType watchType) {
                DataStream<Tuple3<String, Long, Long>> source = addSource(new 
FileMonitoringFunction(
-                       filePath, intervalMillis, watchType), "Read File Stream 
source");
+                               filePath, intervalMillis, watchType), "Read 
File Stream source");
 
                return source.flatMap(new FileReadFunction());
        }
@@ -1172,7 +1170,7 @@ public abstract class StreamExecutionEnvironment {
         *              while
         *              a       negative value ensures retrying forever.
         * @return A data stream containing the strings received from the socket
-        * 
+        *
         * @deprecated Use {@link #socketTextStream(String, int, String, long)} 
instead.
         */
        @Deprecated
@@ -1205,7 +1203,7 @@ public abstract class StreamExecutionEnvironment {
        @PublicEvolving
        public DataStreamSource<String> socketTextStream(String hostname, int 
port, String delimiter, long maxRetry) {
                return addSource(new SocketTextStreamFunction(hostname, port, 
delimiter, maxRetry),
-                       "Socket Stream");
+                               "Socket Stream");
        }
 
        /**
@@ -1220,7 +1218,7 @@ public abstract class StreamExecutionEnvironment {
         * @param delimiter
         *              A character which splits received strings into records
         * @return A data stream containing the strings received from the socket
-        * 
+        *
         * @deprecated Use {@link #socketTextStream(String, int, String)} 
instead.
         */
        @Deprecated
@@ -1323,9 +1321,9 @@ public abstract class StreamExecutionEnvironment {
                if (inputFormat instanceof FileInputFormat) {
                        @SuppressWarnings("unchecked")
                        FileInputFormat<OUT> format = (FileInputFormat<OUT>) 
inputFormat;
-                       
+
                        source = createFileInput(format, typeInfo, "Custom File 
source",
-                               FileProcessingMode.PROCESS_ONCE, -1);
+                                       FileProcessingMode.PROCESS_ONCE, -1);
                } else {
                        source = createInput(inputFormat, typeInfo, "Custom 
Source");
                }
@@ -1352,18 +1350,18 @@ public abstract class StreamExecutionEnvironment {
                Preconditions.checkNotNull(monitoringMode, "Unspecified 
monitoring mode.");
 
                
Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
 ||
-                       interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
-                       "The path monitoring interval cannot be less than " +
-                               
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
+                                               interval >= 
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
+                               "The path monitoring interval cannot be less 
than " +
+                                               
ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");
 
                ContinuousFileMonitoringFunction<OUT> monitoringFunction = new 
ContinuousFileMonitoringFunction<>(
-                       inputFormat, inputFormat.getFilePath().toString(),
-                       monitoringMode, getParallelism(), interval);
+                               inputFormat, 
inputFormat.getFilePath().toString(),
+                               monitoringMode, getParallelism(), interval);
 
                ContinuousFileReaderOperator<OUT, ?> reader = new 
ContinuousFileReaderOperator<>(inputFormat);
 
                SingleOutputStreamOperator<OUT> source = 
addSource(monitoringFunction, sourceName)
-                       .transform("FileSplitReader_" + sourceName, typeInfo, 
reader);
+                               .transform("FileSplitReader_" + sourceName, 
typeInfo, reader);
 
                return new DataStreamSource<>(source);
        }
@@ -1592,7 +1590,7 @@ public abstract class StreamExecutionEnvironment {
                // because the streaming project depends on "flink-clients" 
(and not the other way around)
                // we currently need to intercept the data set environment and 
create a dependent stream env.
                // this should be fixed once we rework the project dependencies
-               
+
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                if (env instanceof ContextEnvironment) {
                        return new 
StreamContextEnvironment((ContextEnvironment) env);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 8e807db..0dd1b37 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -355,7 +355,6 @@ public class StreamConfig implements Serializable {
                        return DEFAULT_CHECKPOINTING_MODE; 
                }
        }
-       
 
        public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a024895..0b7dc2a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -36,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -51,7 +50,6 @@ import 
org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -493,10 +491,23 @@ public class StreamingJobGraphGenerator {
                                ackVertices.add(vertex.getID());
                        }
 
+                       ExternalizedCheckpointSettings 
externalizedCheckpointSettings;
+                       if (cfg.isExternalizedCheckpointsEnabled()) {
+                               CheckpointConfig.ExternalizedCheckpointCleanup 
cleanup = cfg.getExternalizedCheckpointCleanup();
+                               // Sanity check
+                               if (cleanup == null) {
+                                       throw new 
IllegalStateException("Externalized checkpoints enabled, but no cleanup mode 
configured.");
+                               }
+                               externalizedCheckpointSettings = 
ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation());
+                       } else {
+                               externalizedCheckpointSettings = 
ExternalizedCheckpointSettings.none();
+                       }
+
                        JobSnapshottingSettings settings = new 
JobSnapshottingSettings(
                                        triggerVertices, ackVertices, 
commitVertices, interval,
                                        cfg.getCheckpointTimeout(), 
cfg.getMinPauseBetweenCheckpoints(),
-                                       cfg.getMaxConcurrentCheckpoints());
+                                       cfg.getMaxConcurrentCheckpoints(),
+                                       externalizedCheckpointSettings);
                        jobGraph.setSnapshotSettings(settings);
 
                        // check if a restart strategy has been set, if not 
then set the FixedDelayRestartStrategy

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 848a579..48d720a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -52,6 +51,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -96,8 +96,7 @@ public class RescalingITCase extends TestLogger {
 
                config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
                
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
-               config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, 
savepointDir.toURI().toString());
+               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, 
savepointDir.toURI().toString());
 
                cluster = new TestingCluster(config);
                cluster.start();
@@ -167,7 +166,7 @@ public class RescalingITCase extends TestLogger {
                        // clear the CollectionSink set for the restarted job
                        CollectionSink.clearElementsSet();
 
-                       Future<Object> savepointPathFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft());
+                       Future<Object> savepointPathFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), 
deadline.timeLeft());
 
                        final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess)
                                Await.result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
@@ -256,8 +255,7 @@ public class RescalingITCase extends TestLogger {
                        StateSourceBase.workStartedLatch.await();
 
                        while (deadline.hasTimeLeft()) {
-
-                               Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), 
deadline.timeLeft());
+                               Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.<String>empty()), deadline.timeLeft());
                                FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
                                savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
 
@@ -378,7 +376,7 @@ public class RescalingITCase extends TestLogger {
                        // clear the CollectionSink set for the restarted job
                        CollectionSink.clearElementsSet();
 
-                       Future<Object> savepointPathFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft());
+                       Future<Object> savepointPathFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), 
deadline.timeLeft());
 
                        final String savepointPath = 
((JobManagerMessages.TriggerSavepointSuccess)
                                Await.result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
@@ -487,7 +485,7 @@ public class RescalingITCase extends TestLogger {
 
                        while (deadline.hasTimeLeft()) {
 
-                               Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), 
deadline.timeLeft());
+                               Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, 
Option.<String>empty()), deadline.timeLeft());
                                FiniteDuration waitingTime = new 
FiniteDuration(10, TimeUnit.SECONDS);
                                savepointResponse = 
Await.result(savepointPathFuture, waitingTime);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 7409fe7..fc2835d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -32,7 +32,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -51,7 +50,6 @@ import 
org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
@@ -68,6 +66,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -83,7 +82,6 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -158,8 +156,7 @@ public class SavepointITCase extends TestLogger {
                        config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
                        
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
                                        checkpointDir.toURI().toString());
-                       
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem");
-                       
config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
+                       
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
                                        savepointDir.toURI().toString());
 
                        LOG.info("Flink configuration: " + config + ".");
@@ -202,12 +199,20 @@ public class SavepointITCase extends TestLogger {
                        LOG.info("Triggering a savepoint.");
 
                        Future<Object> savepointPathFuture = jobManager.ask(
-                                       new TriggerSavepoint(jobId), 
deadline.timeLeft());
+                                       new TriggerSavepoint(jobId, 
Option.<String>empty()), deadline.timeLeft());
 
                        final String savepointPath = ((TriggerSavepointSuccess) 
Await
                                        .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
                        LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
 
+                       // Only one savepoint should exist
+                       File[] files = savepointDir.listFiles();
+                       if (files != null) {
+                               assertEquals("Savepoint not created in expected 
directory", 1, files.length);
+                       } else {
+                               fail("Savepoint not created in expected 
directory");
+                       }
+
                        // Retrieve the savepoint from the testing job manager
                        LOG.info("Requesting the savepoint.");
                        Future<Object> savepointFuture = jobManager.ask(
@@ -226,7 +231,7 @@ public class SavepointITCase extends TestLogger {
 
                        // Only one checkpoint of the savepoint should exist
                        String errMsg = "Checkpoints directory not cleaned up 
properly.";
-                       File[] files = checkpointDir.listFiles();
+                       files = checkpointDir.listFiles();
                        if (files != null) {
                                assertEquals(errMsg, 1, files.length);
                        }
@@ -396,156 +401,6 @@ public class SavepointITCase extends TestLogger {
                }
        }
 
-       /**
-        * Tests that a job manager backed savepoint is removed when the 
checkpoint
-        * coordinator is shut down, because the associated checkpoints files 
will
-        * linger around otherwise.
-        */
-       @Test
-       public void testCheckpointsRemovedWithJobManagerBackendOnShutdown() 
throws Exception {
-               // Config
-               int numTaskManagers = 2;
-               int numSlotsPerTaskManager = 2;
-               int parallelism = numTaskManagers * numSlotsPerTaskManager;
-
-               // Test deadline
-               final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
-
-               // The number of checkpoints to complete before triggering the 
savepoint
-               final int numberOfCompletedCheckpoints = 10;
-
-               // Temporary directory for file state backend
-               final File tmpDir = CommonTestUtils.createTempDirectory();
-
-               LOG.info("Created temporary directory: " + tmpDir + ".");
-
-               TestingCluster flink = null;
-               List<File> checkpointFiles = new ArrayList<>();
-
-               try {
-                       // Flink configuration
-                       final Configuration config = new Configuration();
-                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
-
-                       final File checkpointDir = new File(tmpDir, 
"checkpoints");
-
-                       if (!checkpointDir.mkdir()) {
-                               fail("Test setup failed: failed to create 
temporary directories.");
-                       }
-
-                       LOG.info("Created temporary checkpoint directory: " + 
checkpointDir + ".");
-
-                       
config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "jobmanager");
-                       config.setString(ConfigConstants.STATE_BACKEND, 
"filesystem");
-                       
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
-                                       checkpointDir.toURI().toString());
-
-                       LOG.info("Flink configuration: " + config + ".");
-
-                       // Start Flink
-                       flink = new TestingCluster(config);
-                       LOG.info("Starting Flink cluster.");
-                       flink.start();
-
-                       // Retrieve the job manager
-                       LOG.info("Retrieving JobManager.");
-                       ActorGateway jobManager = Await.result(
-                                       flink.leaderGateway().future(),
-                                       deadline.timeLeft());
-                       LOG.info("JobManager: " + jobManager + ".");
-
-                       // Submit the job
-                       final JobGraph jobGraph = createJobGraph(parallelism, 
0, 1000, 1000);
-                       final JobID jobId = jobGraph.getJobID();
-
-                       // Wait for the source to be notified about the 
expected number
-                       // of completed checkpoints
-                       InfiniteTestSource.CheckpointCompleteLatch = new 
CountDownLatch(
-                                       numberOfCompletedCheckpoints);
-
-                       LOG.info("Submitting job " + jobGraph.getJobID() + " in 
detached mode.");
-
-                       flink.submitJobDetached(jobGraph);
-
-                       LOG.info("Waiting for " + numberOfCompletedCheckpoints +
-                                       " checkpoint complete notifications.");
-
-                       // Wait...
-                       InfiniteTestSource.CheckpointCompleteLatch.await();
-
-                       LOG.info("Received all " + numberOfCompletedCheckpoints 
+
-                                       " checkpoint complete notifications.");
-
-                       // ...and then trigger the savepoint
-                       LOG.info("Triggering a savepoint.");
-
-                       Future<Object> savepointPathFuture = jobManager.ask(
-                                       new TriggerSavepoint(jobId), 
deadline.timeLeft());
-
-                       final String savepointPath = ((TriggerSavepointSuccess) 
Await
-                                       .result(savepointPathFuture, 
deadline.timeLeft())).savepointPath();
-                       LOG.info("Retrieved savepoint path: " + savepointPath + 
".");
-
-                       // Retrieve the savepoint from the testing job manager
-                       LOG.info("Requesting the savepoint.");
-                       Future<Object> savepointFuture = jobManager.ask(
-                                       new RequestSavepoint(savepointPath),
-                                       deadline.timeLeft());
-
-                       SavepointV1 savepoint = (SavepointV1) 
((ResponseSavepoint) Await.result(
-                                       savepointFuture, 
deadline.timeLeft())).savepoint();
-                       LOG.info("Retrieved savepoint: " + savepointPath + ".");
-
-                       // Check that all checkpoint files have been removed
-                       for (TaskState stateForTaskGroup : 
savepoint.getTaskStates()) {
-                               for (SubtaskState subtaskState : 
stateForTaskGroup.getStates()) {
-                                       ChainedStateHandle<StreamStateHandle> 
streamTaskState = subtaskState.getChainedStateHandle();
-
-                                       for (int i = 0; i < 
streamTaskState.getLength(); i++) {
-                                               if (streamTaskState.get(i) != 
null) {
-                                                       FileStateHandle 
fileStateHandle = (FileStateHandle) streamTaskState.get(i);
-                                                       checkpointFiles.add(new 
File(fileStateHandle.getFilePath().toUri()));
-                                               }
-                                       }
-                               }
-                       }
-
-                       // Cancel the job
-                       LOG.info("Cancelling job " + jobId + ".");
-                       Future<Object> cancelRespFuture = jobManager.ask(
-                                       new CancelJob(jobId), 
deadline.timeLeft());
-                       assertTrue(Await.result(cancelRespFuture, 
deadline.timeLeft())
-                                       instanceof CancellationSuccess);
-
-                       LOG.info("Waiting for job " + jobId + " to be 
removed.");
-                       Future<Object> removedRespFuture = jobManager.ask(
-                                       new NotifyWhenJobRemoved(jobId), 
deadline.timeLeft());
-                       assertTrue((Boolean) Await.result(removedRespFuture, 
deadline.timeLeft()));
-               }
-               finally {
-                       if (flink != null) {
-                               flink.shutdown();
-                       }
-
-                       Thread.sleep(1000);
-
-                       // At least one checkpoint file
-                       assertTrue(checkpointFiles.toString(), 
checkpointFiles.size() > 0);
-
-                       // The checkpoint associated with the savepoint should 
have been
-                       // discarded after shutdown
-                       for (File f : checkpointFiles) {
-                               String errMsg = "Checkpoint file " + f + " not 
cleaned up properly.";
-                               assertFalse(errMsg, f.exists());
-                       }
-
-                       if (tmpDir != null) {
-                               FileUtils.deleteDirectory(tmpDir);
-                       }
-               }
-       }
-
        @Test
        public void testSubmitWithUnknownSavepointPath() throws Exception {
                // Config
@@ -556,6 +411,9 @@ public class SavepointITCase extends TestLogger {
                // Test deadline
                final Deadline deadline = new FiniteDuration(5, 
TimeUnit.MINUTES).fromNow();
 
+               final File tmpDir = CommonTestUtils.createTempDirectory();
+               final File savepointDir = new File(tmpDir, "savepoints");
+
                TestingCluster flink = null;
 
                try {
@@ -563,6 +421,8 @@ public class SavepointITCase extends TestLogger {
                        final Configuration config = new Configuration();
                        
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
                        
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
+                       
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+                                       savepointDir.toURI().toString());
 
                        LOG.info("Flink configuration: " + config + ".");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 65da33f..9aaf116 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -47,6 +46,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -102,8 +102,7 @@ public class ClassLoaderITCase extends TestLogger {
                                
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
                // Savepoint path
-               config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, 
"filesystem");
-               config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY,
+               config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
                                
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
                testCluster = new TestingCluster(config, false);
@@ -296,7 +295,7 @@ public class ClassLoaderITCase extends TestLogger {
                String savepointPath = null;
                for (int i = 0; i < 20; i++) {
                        LOG.info("Triggering savepoint (" + (i+1) + "/20).");
-                       Future<Object> savepointFuture = jm.ask(new 
TriggerSavepoint(jobId), deadline.timeLeft());
+                       Future<Object> savepointFuture = jm.ask(new 
TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
 
                        Object savepointResponse = 
Await.result(savepointFuture, deadline.timeLeft());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
index 7ca9c3e..aef2604 100644
--- 
a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
+++ 
b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala
@@ -64,7 +64,6 @@ class TestingYarnJobManager(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricRegistry : Option[MetricRegistry])
   extends YarnJobManager(
@@ -79,7 +78,6 @@ class TestingYarnJobManager(
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricRegistry)
   with TestingJobManagerLike {}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index b9d52ae..2df78c2 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
 import akka.actor.ActorRef
 import org.apache.flink.configuration.{ConfigConstants, Configuration => 
FlinkConfiguration}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.ContaineredJobManager
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -64,7 +63,6 @@ class YarnJobManager(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[MetricRegistry])
   extends ContaineredJobManager(
@@ -79,7 +77,6 @@ class YarnJobManager(
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) {
 

Reply via email to