This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 03c08377d3b13fcdcb489cdd46ab24c70e58705d Merge: d046063 329e51f Author: Maximilian Michels <[email protected]> AuthorDate: Thu Nov 1 10:22:56 2018 +0100 Merge pull request #6897: [BEAM-5464] Allow to configure ExecutionMode for batch pipelines .../beam/runners/flink/FlinkExecutionEnvironments.java | 3 +++ .../org/apache/beam/runners/flink/FlinkPipelineOptions.java | 12 ++++++++++++ .../org/apache/beam/runners/flink/PipelineOptionsTest.java | 2 ++ 3 files changed, 17 insertions(+) diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index b695c37,a84b964..234b457 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@@ -187,4 -189,12 +189,14 @@@ public interface FlinkPipelineOption Long getLatencyTrackingInterval(); void setLatencyTrackingInterval(Long interval); + + @Description( - "Flink mode for data exchange for batch pipeline. " - + "Reference {@link org.apache.flink.api.common.ExecutionMode}") ++ "Flink mode for data exchange of batch pipelines. " ++ + "Reference {@link org.apache.flink.api.common.ExecutionMode}. " ++ + "Set this to BATCH_FORCED if pipelines get blocked, see " ++ + "https://issues.apache.org/jira/browse/FLINK-10672") + @Default.Enum(PIPELINED) + ExecutionMode getExecutionModeForBatch(); + + void setExecutionModeForBatch(ExecutionMode executionMode); } diff --cc runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 49cdbe8,49cdbe8..257501f --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@@ -38,6 -38,6 +38,7 @@@ import org.apache.beam.sdk.values.Tuple import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.ExecutionConfig; ++import org.apache.flink.api.common.ExecutionMode; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.CheckpointingMode; @@@ -84,6 -84,6 +85,7 @@@ public class PipelineOptionsTest assertThat(options.getStateBackend(), is(nullValue())); assertThat(options.getMaxBundleSize(), is(1000L)); assertThat(options.getMaxBundleTimeMills(), is(1000L)); ++ assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED)); } @Test(expected = Exception.class)
