Repository: beam Updated Branches: refs/heads/master 3178f07b9 -> 6aed130cc
[BEAM-1812] Add externalized checkpoint configuration to FlinkPipelineOptions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63327dd3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63327dd3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63327dd3 Branch: refs/heads/master Commit: 63327dd3878c7b7a1891d53b64d999f40565948d Parents: 3178f07 Author: Jins George <[email protected]> Authored: Tue Apr 4 16:05:57 2017 -0700 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Apr 25 17:33:00 2017 +0200 ---------------------------------------------------------------------- .../flink/FlinkPipelineExecutionEnvironment.java | 8 ++++++++ .../beam/runners/flink/FlinkPipelineOptions.java | 13 +++++++++++++ .../apache/beam/runners/flink/PipelineOptionsTest.java | 9 +++++++++ 3 files changed, 30 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index ba00036..7765a00 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.CollectionEnvironment; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -227,6 +228,13 @@ class FlinkPipelineExecutionEnvironment { throw new IllegalArgumentException("The checkpoint interval must be positive"); } flinkStreamEnv.enableCheckpointing(checkpointInterval); + boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled(); + boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation(); + if (externalizedCheckpoint) { + flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints( + retainOnCancellation ? ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION + : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION); + } } // State backend http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index b769a6f..764fa5f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -103,4 +103,17 @@ public interface FlinkPipelineOptions Boolean getEnableMetrics(); void setEnableMetrics(Boolean enableMetrics); + /** + * Enables or disables externalized checkpoints. + */ + @Description("Enables or disables externalized checkpoints. " + + "Works in conjunction with CheckpointingInterval") + @Default.Boolean(false) + Boolean isExternalizedCheckpointsEnabled(); + void setExternalizedCheckpointsEnabled(Boolean externalCheckpoints); + + @Description("Sets the behavior of externalized checkpoints on cancellation.") + @Default.Boolean(false) + Boolean getRetainExternalizedCheckpointsOnCancellation(); + void setRetainExternalizedCheckpointsOnCancellation(Boolean retainOnCancellation); } http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 9bc2c3d..23740a1 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -180,6 +180,15 @@ public class PipelineOptionsTest { } + @Test + public void testExternalizedCheckpointsConfigs() { + String[] args = new String[] { "--externalizedCheckpointsEnabled=true", + "--retainExternalizedCheckpointsOnCancellation=false" }; + final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args) + .as(FlinkPipelineOptions.class); + assertEquals(options.isExternalizedCheckpointsEnabled(), true); + assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false); + } private static class TestDoFn extends DoFn<String, String> {
