Repository: beam Updated Branches: refs/heads/master 1866a0113 -> 462335caf
Flink runner: specify checkpointTimeout through PipelineOptions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/802f10af Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/802f10af Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/802f10af Branch: refs/heads/master Commit: 802f10afd5d73ba32ad90ba222f2d80216a18a4d Parents: b8035ae Author: Pei He <[email protected]> Authored: Sat May 27 14:59:22 2017 +0800 Committer: Pei He <[email protected]> Committed: Sun Jun 4 16:18:36 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkPipelineExecutionEnvironment.java | 2 ++ .../org/apache/beam/runners/flink/FlinkPipelineOptions.java | 5 +++++ 2 files changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/802f10af/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 98f7c5a..fe5dd87 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -228,6 +228,8 @@ class FlinkPipelineExecutionEnvironment { throw new IllegalArgumentException("The checkpoint interval must be positive"); } flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode()); + flinkStreamEnv.getCheckpointConfig().setCheckpointTimeout( + options.getCheckpointTimeoutMillis()); boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled(); boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation(); if (externalizedCheckpoint) { http://git-wip-us.apache.org/repos/asf/beam/blob/802f10af/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index ee07abb..c255672 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -76,6 +76,11 @@ public interface FlinkPipelineOptions CheckpointingMode getCheckpointingMode(); void setCheckpointingMode(CheckpointingMode mode); + @Description("The maximum time that a checkpoint may take before being discarded.") + @Default.Long(20 * 60 * 1000) + Long getCheckpointTimeoutMillis(); + void setCheckpointTimeoutMillis(Long checkpointTimeoutMillis); + @Description("Sets the number of times that failed tasks are re-executed. " + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + "that the system default value (as defined in the configuration) should be used.")
