Repository: flink
Updated Branches:
  refs/heads/release-1.5 64117e8e6 -> 4e937aa58


[FLINK-9074] [e2e] Allow configuring externalized checkpoints for the general 
purpose DataStream job


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59f9c121
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59f9c121
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59f9c121

Branch: refs/heads/release-1.5
Commit: 59f9c1215899e4d3ef7ebd634d7f83d42ba741fd
Parents: 64117e8
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Wed May 9 11:40:24 2018 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue May 15 16:47:48 2018 +0800

----------------------------------------------------------------------
 .../tests/DataStreamAllroundTestJobFactory.java | 35 ++++++++++++++++++++
 1 file changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59f9c121/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index e7b0bdc..2577460 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
@@ -53,6 +54,8 @@ import java.util.List;
  * <ul>
  *     <li>test.semantics (String, default - 'exactly-once'): This configures 
the semantics to test. Can be 'exactly-once' or 'at-least-once'.</li>
  *     <li>environment.checkpoint_interval (long, default - 1000): the 
checkpoint interval.</li>
+ *     <li>environment.externalize_checkpoint (boolean, default - false): 
whether or not checkpoints should be externalized.</li>
+ *     <li>environment.externalize_checkpoint.cleanup (String, default - 
'retain'): Configures the cleanup mode for externalized checkpoints. Can be 
'retain' or 'delete'.</li>
  *     <li>environment.parallelism (int, default - 1): parallelism to use for 
the job.</li>
  *     <li>environment.max_parallelism (int, default - 128): max parallelism 
to use for the job</li>
  *     <li>environment.restart_strategy.delay (long, default - 0): delay 
between restart attempts, in milliseconds.</li>
@@ -90,6 +93,14 @@ class DataStreamAllroundTestJobFactory {
                .key("environment.restart_strategy.delay")
                .defaultValue(0);
 
+       private static final ConfigOption<Boolean> 
ENVIRONMENT_EXTERNALIZE_CHECKPOINT = ConfigOptions
+               .key("environment.externalize_checkpoint")
+               .defaultValue(false);
+
+       private static final ConfigOption<String> 
ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP = ConfigOptions
+               .key("environment.externalize_checkpoint.cleanup")
+               .defaultValue("retain");
+
        private static final ConfigOption<String> STATE_BACKEND = ConfigOptions
                .key("state_backend")
                .defaultValue("file")
@@ -180,6 +191,30 @@ class DataStreamAllroundTestJobFactory {
                        throw new IllegalArgumentException("Unknown backend 
requested: " + stateBackend);
                }
 
+               boolean enableExternalizedCheckpoints = pt.getBoolean(
+                       ENVIRONMENT_EXTERNALIZE_CHECKPOINT.key(),
+                       ENVIRONMENT_EXTERNALIZE_CHECKPOINT.defaultValue());
+
+               if (enableExternalizedCheckpoints) {
+                       String cleanupModeConfig = pt.get(
+                               
ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.key(),
+                               
ENVIRONMENT_EXTERNALIZE_CHECKPOINT_CLEANUP.defaultValue());
+
+                       CheckpointConfig.ExternalizedCheckpointCleanup 
cleanupMode;
+                       switch (cleanupModeConfig) {
+                               case "retain":
+                                       cleanupMode = 
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;
+                                       break;
+                               case "delete":
+                                       cleanupMode = 
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION;
+                                       break;
+                               default:
+                                       throw new 
IllegalArgumentException("Unknown clean up mode for externalized checkpoints: " 
+ cleanupModeConfig);
+                       }
+
+                       
env.getCheckpointConfig().enableExternalizedCheckpoints(cleanupMode);
+               }
+
                // make parameters available in the web interface
                env.getConfig().setGlobalJobParameters(pt);
        }

Reply via email to