Repository: beam
Updated Branches:
  refs/heads/master 3178f07b9 -> 6aed130cc


[BEAM-1812] Add externalized checkpoint configuration to FlinkPipelineOptions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63327dd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63327dd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63327dd3

Branch: refs/heads/master
Commit: 63327dd3878c7b7a1891d53b64d999f40565948d
Parents: 3178f07
Author: Jins George <[email protected]>
Authored: Tue Apr 4 16:05:57 2017 -0700
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Apr 25 17:33:00 2017 +0200

----------------------------------------------------------------------
 .../flink/FlinkPipelineExecutionEnvironment.java       |  8 ++++++++
 .../beam/runners/flink/FlinkPipelineOptions.java       | 13 +++++++++++++
 .../apache/beam/runners/flink/PipelineOptionsTest.java |  9 +++++++++
 3 files changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index ba00036..7765a00 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.CollectionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -227,6 +228,13 @@ class FlinkPipelineExecutionEnvironment {
         throw new IllegalArgumentException("The checkpoint interval must be 
positive");
       }
       flinkStreamEnv.enableCheckpointing(checkpointInterval);
+      boolean externalizedCheckpoint = 
options.isExternalizedCheckpointsEnabled();
+      boolean retainOnCancellation = 
options.getRetainExternalizedCheckpointsOnCancellation();
+      if (externalizedCheckpoint) {
+        flinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
+            retainOnCancellation ? 
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
+                : ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+      }
     }
 
     // State backend

http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b769a6f..764fa5f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -103,4 +103,17 @@ public interface FlinkPipelineOptions
   Boolean getEnableMetrics();
   void setEnableMetrics(Boolean enableMetrics);
 
+  /**
+   * Enables or disables externalized checkpoints.
+   */
+  @Description("Enables or disables externalized checkpoints. "
+      + "Works in conjunction with CheckpointingInterval")
+  @Default.Boolean(false)
+  Boolean isExternalizedCheckpointsEnabled();
+  void setExternalizedCheckpointsEnabled(Boolean externalCheckpoints);
+
+  @Description("Sets the behavior of externalized checkpoints on 
cancellation.")
+  @Default.Boolean(false)
+  Boolean getRetainExternalizedCheckpointsOnCancellation();
+  void setRetainExternalizedCheckpointsOnCancellation(Boolean 
retainOnCancellation);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/63327dd3/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 9bc2c3d..23740a1 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -180,6 +180,15 @@ public class PipelineOptionsTest {
 
   }
 
+  @Test
+  public void testExternalizedCheckpointsConfigs() {
+    String[] args = new String[] { "--externalizedCheckpointsEnabled=true",
+        "--retainExternalizedCheckpointsOnCancellation=false" };
+    final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
+        .as(FlinkPipelineOptions.class);
+    assertEquals(options.isExternalizedCheckpointsEnabled(), true);
+    assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), 
false);
+  }
 
   private static class TestDoFn extends DoFn<String, String> {
 

Reply via email to