Repository: flink Updated Branches: refs/heads/master c7d1a3b8d -> fd410d9f6
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index cdeef94..0a7f65e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -18,11 +18,14 @@ package org.apache.flink.streaming.api.environment; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.streaming.api.CheckpointingMode; import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Configuration that captures all checkpointing related settings. @@ -64,6 +67,9 @@ public class CheckpointConfig implements java.io.Serializable { /** Flag to force checkpointing in iterative jobs */ private boolean forceCheckpointing; + /** Cleanup behaviour for persistent checkpoints. */ + private ExternalizedCheckpointCleanup externalizedCheckpointCleanup; + // ------------------------------------------------------------------------ /** @@ -223,4 +229,99 @@ public class CheckpointConfig implements java.io.Serializable { public void setForceCheckpointing(boolean forceCheckpointing) { this.forceCheckpointing = forceCheckpointing; } + + /** + * Enables checkpoints to be persisted externally. + * + * <p>Externalized checkpoints write their meta data out to persistent + * storage and are <strong>not</strong> automatically cleaned up when + * the owning job fails (terminating with job status {@link JobStatus#FAILED}). + * In this case, you have to manually clean up the checkpoint state, both + * the meta data and actual program state. + * + * <p>The {@link ExternalizedCheckpointCleanup} mode defines how an + * externalized checkpoint should be cleaned up on job cancellation. If you + * choose to retain externalized checkpoints on cancellation you have you + * handle checkpoint clean up manually when you cancel the job as well + * (terminating with job status {@link JobStatus#CANCELED}). + * + * <p>The target directory for externalized checkpoints is configured + * via {@link ConfigConstants#CHECKPOINTS_DIRECTORY_KEY}. + * + * @param cleanupMode Externalized checkpoint cleanup behaviour. + */ + @PublicEvolving + public void enableExternalizedCheckpoints(ExternalizedCheckpointCleanup cleanupMode) { + this.externalizedCheckpointCleanup = checkNotNull(cleanupMode); + } + + /** + * Returns whether checkpoints should be persisted externally. + * + * @return <code>true</code> if checkpoints should be externalized. + */ + @PublicEvolving + public boolean isExternalizedCheckpointsEnabled() { + return externalizedCheckpointCleanup != null; + } + + /** + * Returns the cleanup behaviour for externalized checkpoints. + * + * @return The cleanup behaviour for externalized checkpoints or + * <code>null</code> if none is configured. + */ + @PublicEvolving + public ExternalizedCheckpointCleanup getExternalizedCheckpointCleanup() { + return externalizedCheckpointCleanup; + } + + /** + * Cleanup behaviour for externalized checkpoints when the job is cancelled. + */ + @PublicEvolving + public enum ExternalizedCheckpointCleanup { + + /** + * Delete externalized checkpoints on job cancellation. + * + * <p>All checkpoint state will be deleted when you cancel the owning + * job, both the meta data and actual program state. Therefore, you + * cannot resume from externalized checkpoints after the job has been + * cancelled. + * + * <p>Note that checkpoint state is always kept if the job terminates + * with state {@link JobStatus#FAILED}. + */ + DELETE_ON_CANCELLATION(true), + + /** + * Retain externalized checkpoints on job cancellation. + * + * <p>All checkpoint state is kept when you cancel the owning job. You + * have to manually delete both the checkpoint meta data and actual + * program state after cancelling the job. + * + * <p>Note that checkpoint state is always kept if the job terminates + * with state {@link JobStatus#FAILED}. + */ + RETAIN_ON_CANCELLATION(false); + + private final boolean deleteOnCancellation; + + ExternalizedCheckpointCleanup(boolean deleteOnCancellation) { + this.deleteOnCancellation = deleteOnCancellation; + } + + /** + * Returns whether persistent checkpoints shall be discarded on + * cancellation of the job. + * + * @return <code>true</code> if persistent checkpoints shall be discarded + * on cancellation of the job. + */ + public boolean deleteOnCancellation() { + return deleteOnCancellation; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 8fc2872..e1cfc60 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -89,7 +89,7 @@ import java.util.List; * The StreamExecutionEnvironment is the context in which a streaming program is executed. A * {@link LocalStreamEnvironment} will cause execution in the current JVM, a * {@link RemoteStreamEnvironment} will cause execution on a remote setup. - * + * * <p>The environment provides methods to control the job execution (such as setting the parallelism * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access). * @@ -101,7 +101,7 @@ public abstract class StreamExecutionEnvironment { /** The default name to use for a streaming job if no other name has been specified */ public static final String DEFAULT_JOB_NAME = "Flink Streaming Job"; - + /** The time characteristic that is used if none other is set */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; @@ -113,28 +113,28 @@ public abstract class StreamExecutionEnvironment { /** The default parallelism used when creating a local environment */ private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); - + // ------------------------------------------------------------------------ /** The execution configuration for this environment */ private final ExecutionConfig config = new ExecutionConfig(); - - /** Settings that control the checkpointing behavior */ + + /** Settings that control the checkpointing behavior */ private final CheckpointConfig checkpointCfg = new CheckpointConfig(); - + protected final List<StreamTransformation<?>> transformations = new ArrayList<>(); - + private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT; - + protected boolean isChainingEnabled = true; - + /** The state backend used for storing k/v state and state snapshots */ private AbstractStateBackend defaultStateBackend; - + /** The time characteristic used by the data streams */ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; - + // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- @@ -274,7 +274,7 @@ public abstract class StreamExecutionEnvironment { /** * Gets the checkpoint config, which defines values like checkpoint interval, delay between * checkpoints, etc. - * + * * @return The checkpoint config. */ public CheckpointConfig getCheckpointConfig() { @@ -286,13 +286,13 @@ public abstract class StreamExecutionEnvironment { * dataflow will be periodically snapshotted. In case of a failure, the streaming * dataflow will be restarted from the latest completed checkpoint. This method selects * {@link CheckpointingMode#EXACTLY_ONCE} guarantees. - * + * * <p>The job draws checkpoints periodically, in the given interval. The state will be * stored in the configured state backend.</p> - * + * * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at * the moment. For that reason, iterative jobs will not be started if used - * with enabled checkpointing. To override this mechanism, use the + * with enabled checkpointing. To override this mechanism, use the * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p> * * @param interval Time interval between state checkpoints in milliseconds. @@ -313,12 +313,12 @@ public abstract class StreamExecutionEnvironment { * * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at * the moment. For that reason, iterative jobs will not be started if used - * with enabled checkpointing. To override this mechanism, use the + * with enabled checkpointing. To override this mechanism, use the * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p> * - * @param interval + * @param interval * Time interval between state checkpoints in milliseconds. - * @param mode + * @param mode * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed. */ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) { @@ -326,7 +326,7 @@ public abstract class StreamExecutionEnvironment { checkpointCfg.setCheckpointInterval(interval); return this; } - + /** * Enables checkpointing for the streaming job. The distributed state of the streaming * dataflow will be periodically snapshotted. In case of a failure, the streaming @@ -338,7 +338,7 @@ public abstract class StreamExecutionEnvironment { * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at * the moment. If the "force" parameter is set to true, the system will execute the * job nonetheless.</p> - * + * * @param interval * Time interval between state checkpoints in millis. * @param mode @@ -367,9 +367,9 @@ public abstract class StreamExecutionEnvironment { * * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at * the moment. For that reason, iterative jobs will not be started if used - * with enabled checkpointing. To override this mechanism, use the + * with enabled checkpointing. To override this mechanism, use the * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p> - * + * * @deprecated Use {@link #enableCheckpointing(long)} instead. */ @Deprecated @@ -381,7 +381,7 @@ public abstract class StreamExecutionEnvironment { /** * Returns the checkpointing interval or -1 if checkpointing is disabled. - * + * * <p>Shorthand for {@code getCheckpointConfig().getCheckpointInterval()}. * * @return The checkpointing interval or -1 @@ -402,9 +402,9 @@ public abstract class StreamExecutionEnvironment { /** * Returns the checkpointing mode (exactly-once vs. at-least-once). - * + * * <p>Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}. - * + * * @return The checkpoin */ public CheckpointingMode getCheckpointingMode() { @@ -422,7 +422,7 @@ public abstract class StreamExecutionEnvironment { * <p>The {@link org.apache.flink.runtime.state.memory.MemoryStateBackend} for example * maintains the state in heap memory, as objects. It is lightweight without extra dependencies, * but can checkpoint only small states (some counters). - * + * * <p>In contrast, the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend} * stores checkpoints of the state (also maintained as heap objects) in files. When using a replicated * file system (like HDFS, S3, MapR FS, Tachyon, etc) this will guarantee that state is not lost upon @@ -430,7 +430,7 @@ public abstract class StreamExecutionEnvironment { * consistent (assuming that Flink is run in high-availability mode). * * @return This StreamExecutionEnvironment itself, to allow chaining of function calls. - * + * * @see #getStateBackend() */ @PublicEvolving @@ -442,7 +442,7 @@ public abstract class StreamExecutionEnvironment { /** * Returns the state backend that defines how to store and checkpoint state. * @return The state backend that defines how to store and checkpoint state. - * + * * @see #setStateBackend(AbstractStateBackend) */ @PublicEvolving @@ -547,8 +547,7 @@ public abstract class StreamExecutionEnvironment { * @param serializerClass * The class of the serializer to use. */ - public void addDefaultKryoSerializer(Class<?> type, - Class<? extends Serializer<?>> serializerClass) { + public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) { config.addDefaultKryoSerializer(type, serializerClass); } @@ -617,7 +616,7 @@ public abstract class StreamExecutionEnvironment { * If you set the characteristic to IngestionTime of EventTime this will set a default * watermark update interval of 200 ms. If this is not applicable for your application * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. - * + * * @param characteristic The time characteristic. */ @PublicEvolving @@ -641,7 +640,7 @@ public abstract class StreamExecutionEnvironment { public TimeCharacteristic getStreamTimeCharacteristic() { return timeCharacteristic; } - + // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- @@ -694,15 +693,15 @@ public abstract class StreamExecutionEnvironment { } catch (Exception e) { throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() - + "; please specify the TypeInformation manually via " - + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)"); + + "; please specify the TypeInformation manually via " + + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)"); } return fromCollection(Arrays.asList(data), typeInfo); } /** - * Creates a new data set that contains the given elements. The framework will determine the type according to the - * based type user supplied. The elements should be the same or be the subclass to the based type. + * Creates a new data set that contains the given elements. The framework will determine the type according to the + * based type user supplied. The elements should be the same or be the subclass to the based type. * The sequence of elements must not be empty. * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a * degree of parallelism one. @@ -776,7 +775,7 @@ public abstract class StreamExecutionEnvironment { /** * Creates a data stream from the given non-empty collection. - * + * * <p>Note that this operation will result in a non-parallel data stream source, * i.e., a data stream source with a parallelism one.</p> * @@ -790,7 +789,7 @@ public abstract class StreamExecutionEnvironment { */ public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo) { Preconditions.checkNotNull(data, "Collection must not be null"); - + // must not have null elements and mixed elements FromElementsFunction.checkCollection(data, typeInfo.getTypeClass()); @@ -806,11 +805,11 @@ public abstract class StreamExecutionEnvironment { /** * Creates a data stream from the given iterator. - * + * * <p>Because the iterator will remain unmodified until the actual execution happens, * the type of data returned by the iterator must be given explicitly in the form of the type * class (this is due to the fact that the Java compiler erases the generic type information).</p> - * + * * <p>Note that this operation will result in a non-parallel data stream source, i.e., * a data stream source with a parallelism of one.</p> * @@ -829,13 +828,13 @@ public abstract class StreamExecutionEnvironment { /** * Creates a data stream from the given iterator. - * + * * <p>Because the iterator will remain unmodified until the actual execution happens, * the type of data returned by the iterator must be given explicitly in the form of the type * information. This method is useful for cases where the type is generic. * In that case, the type class (as given in * {@link #fromCollection(java.util.Iterator, Class)} does not supply all type information.</p> - * + * * <p>Note that this operation will result in a non-parallel data stream source, i.e., * a data stream source with a parallelism one.</p> * @@ -857,7 +856,7 @@ public abstract class StreamExecutionEnvironment { /** * Creates a new data stream that contains elements in the iterator. The iterator is splittable, allowing the * framework to create a parallel data stream source that returns the elements in the iterator. - * + * * <p>Because the iterator will remain unmodified until the actual execution happens, the type of data returned by the * iterator must be given explicitly in the form of the type class (this is due to the fact that the Java compiler * erases the generic type information).</p> @@ -1020,8 +1019,8 @@ public abstract class StreamExecutionEnvironment { typeInformation = TypeExtractor.getInputFormatTypes(inputFormat); } catch (Exception e) { throw new InvalidProgramException("The type returned by the input format could not be " + - "automatically determined. Please specify the TypeInformation of the produced type " + - "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); + "automatically determined. Please specify the TypeInformation of the produced type " + + "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); } return readFile(inputFormat, filePath, watchType, interval, typeInformation); } @@ -1072,8 +1071,8 @@ public abstract class StreamExecutionEnvironment { typeInformation = TypeExtractor.getInputFormatTypes(inputFormat); } catch (Exception e) { throw new InvalidProgramException("The type returned by the input format could not be " + - "automatically determined. Please specify the TypeInformation of the produced type " + - "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); + "automatically determined. Please specify the TypeInformation of the produced type " + + "explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead."); } return readFile(inputFormat, filePath, watchType, interval, typeInformation); } @@ -1094,15 +1093,14 @@ public abstract class StreamExecutionEnvironment { * contents * of files. * @return The DataStream containing the given directory. - * + * * @deprecated Use {@link #readFile(FileInputFormat, String, FileProcessingMode, long)} instead. */ @Deprecated @SuppressWarnings("deprecation") - public DataStream<String> readFileStream(String filePath, long intervalMillis, - FileMonitoringFunction.WatchType watchType) { + public DataStream<String> readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType) { DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction( - filePath, intervalMillis, watchType), "Read File Stream source"); + filePath, intervalMillis, watchType), "Read File Stream source"); return source.flatMap(new FileReadFunction()); } @@ -1172,7 +1170,7 @@ public abstract class StreamExecutionEnvironment { * while * a negative value ensures retrying forever. * @return A data stream containing the strings received from the socket - * + * * @deprecated Use {@link #socketTextStream(String, int, String, long)} instead. */ @Deprecated @@ -1205,7 +1203,7 @@ public abstract class StreamExecutionEnvironment { @PublicEvolving public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), - "Socket Stream"); + "Socket Stream"); } /** @@ -1220,7 +1218,7 @@ public abstract class StreamExecutionEnvironment { * @param delimiter * A character which splits received strings into records * @return A data stream containing the strings received from the socket - * + * * @deprecated Use {@link #socketTextStream(String, int, String)} instead. */ @Deprecated @@ -1323,9 +1321,9 @@ public abstract class StreamExecutionEnvironment { if (inputFormat instanceof FileInputFormat) { @SuppressWarnings("unchecked") FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; - + source = createFileInput(format, typeInfo, "Custom File source", - FileProcessingMode.PROCESS_ONCE, -1); + FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, "Custom Source"); } @@ -1352,18 +1350,18 @@ public abstract class StreamExecutionEnvironment { Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode."); Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) || - interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, - "The path monitoring interval cannot be less than " + - ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms."); + interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL, + "The path monitoring interval cannot be less than " + + ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms."); ContinuousFileMonitoringFunction<OUT> monitoringFunction = new ContinuousFileMonitoringFunction<>( - inputFormat, inputFormat.getFilePath().toString(), - monitoringMode, getParallelism(), interval); + inputFormat, inputFormat.getFilePath().toString(), + monitoringMode, getParallelism(), interval); ContinuousFileReaderOperator<OUT, ?> reader = new ContinuousFileReaderOperator<>(inputFormat); SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName) - .transform("FileSplitReader_" + sourceName, typeInfo, reader); + .transform("FileSplitReader_" + sourceName, typeInfo, reader); return new DataStreamSource<>(source); } @@ -1592,7 +1590,7 @@ public abstract class StreamExecutionEnvironment { // because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies - + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 8e807db..0dd1b37 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -355,7 +355,6 @@ public class StreamConfig implements Serializable { return DEFAULT_CHECKPOINTING_MODE; } } - public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index a024895..0b7dc2a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.graph; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; - import org.apache.commons.lang3.StringUtils; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -36,6 +34,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -51,7 +50,6 @@ import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -493,10 +491,23 @@ public class StreamingJobGraphGenerator { ackVertices.add(vertex.getID()); } + ExternalizedCheckpointSettings externalizedCheckpointSettings; + if (cfg.isExternalizedCheckpointsEnabled()) { + CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup(); + // Sanity check + if (cleanup == null) { + throw new IllegalStateException("Externalized checkpoints enabled, but no cleanup mode configured."); + } + externalizedCheckpointSettings = ExternalizedCheckpointSettings.externalizeCheckpoints(cleanup.deleteOnCancellation()); + } else { + externalizedCheckpointSettings = ExternalizedCheckpointSettings.none(); + } + JobSnapshottingSettings settings = new JobSnapshottingSettings( triggerVertices, ackVertices, commitVertices, interval, cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(), - cfg.getMaxConcurrentCheckpoints()); + cfg.getMaxConcurrentCheckpoints(), + externalizedCheckpointSettings); jobGraph.setSnapshotSettings(settings); // check if a restart strategy has been set, if not then set the FixedDelayRestartStrategy http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 848a579..48d720a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -28,7 +28,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.instance.ActorGateway; @@ -52,6 +51,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -96,8 +96,7 @@ public class RescalingITCase extends TestLogger { config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); - config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); - config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); cluster = new TestingCluster(config); cluster.start(); @@ -167,7 +166,7 @@ public class RescalingITCase extends TestLogger { // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); + Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath(); @@ -256,8 +255,7 @@ public class RescalingITCase extends TestLogger { StateSourceBase.workStartedLatch.await(); while (deadline.hasTimeLeft()) { - - Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); + Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); savepointResponse = Await.result(savepointPathFuture, waitingTime); @@ -378,7 +376,7 @@ public class RescalingITCase extends TestLogger { // clear the CollectionSink set for the restarted job CollectionSink.clearElementsSet(); - Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); + Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath(); @@ -487,7 +485,7 @@ public class RescalingITCase extends TestLogger { while (deadline.hasTimeLeft()) { - Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft()); + Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft()); FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS); savepointResponse = Await.result(savepointPathFuture, waitingTime); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 7409fe7..fc2835d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -32,7 +32,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.SuppressRestartsException; @@ -51,7 +50,6 @@ import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; @@ -68,6 +66,7 @@ import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -83,7 +82,6 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess; import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -158,8 +156,7 @@ public class SavepointITCase extends TestLogger { config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString()); - config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); - config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString()); LOG.info("Flink configuration: " + config + "."); @@ -202,12 +199,20 @@ public class SavepointITCase extends TestLogger { LOG.info("Triggering a savepoint."); Future<Object> savepointPathFuture = jobManager.ask( - new TriggerSavepoint(jobId), deadline.timeLeft()); + new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft()); final String savepointPath = ((TriggerSavepointSuccess) Await .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); LOG.info("Retrieved savepoint path: " + savepointPath + "."); + // Only one savepoint should exist + File[] files = savepointDir.listFiles(); + if (files != null) { + assertEquals("Savepoint not created in expected directory", 1, files.length); + } else { + fail("Savepoint not created in expected directory"); + } + // Retrieve the savepoint from the testing job manager LOG.info("Requesting the savepoint."); Future<Object> savepointFuture = jobManager.ask( @@ -226,7 +231,7 @@ public class SavepointITCase extends TestLogger { // Only one checkpoint of the savepoint should exist String errMsg = "Checkpoints directory not cleaned up properly."; - File[] files = checkpointDir.listFiles(); + files = checkpointDir.listFiles(); if (files != null) { assertEquals(errMsg, 1, files.length); } @@ -396,156 +401,6 @@ public class SavepointITCase extends TestLogger { } } - /** - * Tests that a job manager backed savepoint is removed when the checkpoint - * coordinator is shut down, because the associated checkpoints files will - * linger around otherwise. - */ - @Test - public void testCheckpointsRemovedWithJobManagerBackendOnShutdown() throws Exception { - // Config - int numTaskManagers = 2; - int numSlotsPerTaskManager = 2; - int parallelism = numTaskManagers * numSlotsPerTaskManager; - - // Test deadline - final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); - - // The number of checkpoints to complete before triggering the savepoint - final int numberOfCompletedCheckpoints = 10; - - // Temporary directory for file state backend - final File tmpDir = CommonTestUtils.createTempDirectory(); - - LOG.info("Created temporary directory: " + tmpDir + "."); - - TestingCluster flink = null; - List<File> checkpointFiles = new ArrayList<>(); - - try { - // Flink configuration - final Configuration config = new Configuration(); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); - - final File checkpointDir = new File(tmpDir, "checkpoints"); - - if (!checkpointDir.mkdir()) { - fail("Test setup failed: failed to create temporary directories."); - } - - LOG.info("Created temporary checkpoint directory: " + checkpointDir + "."); - - config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "jobmanager"); - config.setString(ConfigConstants.STATE_BACKEND, "filesystem"); - config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, - checkpointDir.toURI().toString()); - - LOG.info("Flink configuration: " + config + "."); - - // Start Flink - flink = new TestingCluster(config); - LOG.info("Starting Flink cluster."); - flink.start(); - - // Retrieve the job manager - LOG.info("Retrieving JobManager."); - ActorGateway jobManager = Await.result( - flink.leaderGateway().future(), - deadline.timeLeft()); - LOG.info("JobManager: " + jobManager + "."); - - // Submit the job - final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000); - final JobID jobId = jobGraph.getJobID(); - - // Wait for the source to be notified about the expected number - // of completed checkpoints - InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch( - numberOfCompletedCheckpoints); - - LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode."); - - flink.submitJobDetached(jobGraph); - - LOG.info("Waiting for " + numberOfCompletedCheckpoints + - " checkpoint complete notifications."); - - // Wait... - InfiniteTestSource.CheckpointCompleteLatch.await(); - - LOG.info("Received all " + numberOfCompletedCheckpoints + - " checkpoint complete notifications."); - - // ...and then trigger the savepoint - LOG.info("Triggering a savepoint."); - - Future<Object> savepointPathFuture = jobManager.ask( - new TriggerSavepoint(jobId), deadline.timeLeft()); - - final String savepointPath = ((TriggerSavepointSuccess) Await - .result(savepointPathFuture, deadline.timeLeft())).savepointPath(); - LOG.info("Retrieved savepoint path: " + savepointPath + "."); - - // Retrieve the savepoint from the testing job manager - LOG.info("Requesting the savepoint."); - Future<Object> savepointFuture = jobManager.ask( - new RequestSavepoint(savepointPath), - deadline.timeLeft()); - - SavepointV1 savepoint = (SavepointV1) ((ResponseSavepoint) Await.result( - savepointFuture, deadline.timeLeft())).savepoint(); - LOG.info("Retrieved savepoint: " + savepointPath + "."); - - // Check that all checkpoint files have been removed - for (TaskState stateForTaskGroup : savepoint.getTaskStates()) { - for (SubtaskState subtaskState : stateForTaskGroup.getStates()) { - ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getChainedStateHandle(); - - for (int i = 0; i < streamTaskState.getLength(); i++) { - if (streamTaskState.get(i) != null) { - FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i); - checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri())); - } - } - } - } - - // Cancel the job - LOG.info("Cancelling job " + jobId + "."); - Future<Object> cancelRespFuture = jobManager.ask( - new CancelJob(jobId), deadline.timeLeft()); - assertTrue(Await.result(cancelRespFuture, deadline.timeLeft()) - instanceof CancellationSuccess); - - LOG.info("Waiting for job " + jobId + " to be removed."); - Future<Object> removedRespFuture = jobManager.ask( - new NotifyWhenJobRemoved(jobId), deadline.timeLeft()); - assertTrue((Boolean) Await.result(removedRespFuture, deadline.timeLeft())); - } - finally { - if (flink != null) { - flink.shutdown(); - } - - Thread.sleep(1000); - - // At least one checkpoint file - assertTrue(checkpointFiles.toString(), checkpointFiles.size() > 0); - - // The checkpoint associated with the savepoint should have been - // discarded after shutdown - for (File f : checkpointFiles) { - String errMsg = "Checkpoint file " + f + " not cleaned up properly."; - assertFalse(errMsg, f.exists()); - } - - if (tmpDir != null) { - FileUtils.deleteDirectory(tmpDir); - } - } - } - @Test public void testSubmitWithUnknownSavepointPath() throws Exception { // Config @@ -556,6 +411,9 @@ public class SavepointITCase extends TestLogger { // Test deadline final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); + final File tmpDir = CommonTestUtils.createTempDirectory(); + final File savepointDir = new File(tmpDir, "savepoints"); + TestingCluster flink = null; try { @@ -563,6 +421,8 @@ public class SavepointITCase extends TestLogger { final Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager); + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, + savepointDir.toURI().toString()); LOG.info("Flink configuration: " + config + "."); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 65da33f..9aaf116 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -47,6 +46,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -102,8 +102,7 @@ public class ClassLoaderITCase extends TestLogger { FOLDER.newFolder().getAbsoluteFile().toURI().toString()); // Savepoint path - config.setString(SavepointStoreFactory.SAVEPOINT_BACKEND_KEY, "filesystem"); - config.setString(SavepointStoreFactory.SAVEPOINT_DIRECTORY_KEY, + config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, FOLDER.newFolder().getAbsoluteFile().toURI().toString()); testCluster = new TestingCluster(config, false); @@ -296,7 +295,7 @@ public class ClassLoaderITCase extends TestLogger { String savepointPath = null; for (int i = 0; i < 20; i++) { LOG.info("Triggering savepoint (" + (i+1) + "/20)."); - Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId), deadline.timeLeft()); + Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft()); Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft()); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala index 7ca9c3e..aef2604 100644 --- a/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala +++ b/flink-yarn-tests/src/test/scala/org/apache/flink/yarn/TestingYarnJobManager.scala @@ -64,7 +64,6 @@ class TestingYarnJobManager( leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore: SavepointStore, jobRecoveryTimeout: FiniteDuration, metricRegistry : Option[MetricRegistry]) extends YarnJobManager( @@ -79,7 +78,6 @@ class TestingYarnJobManager( leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricRegistry) with TestingJobManagerLike {} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index b9d52ae..2df78c2 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -23,7 +23,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit} import akka.actor.ActorRef import org.apache.flink.configuration.{ConfigConstants, Configuration => FlinkConfiguration} import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.ContaineredJobManager import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -64,7 +63,6 @@ class YarnJobManager( leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore: SavepointStore, jobRecoveryTimeout: FiniteDuration, metricsRegistry: Option[MetricRegistry]) extends ContaineredJobManager( @@ -79,7 +77,6 @@ class YarnJobManager( leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) {