Repository: beam Updated Branches: refs/heads/master caecac3b4 -> 32f22b7d9
Shutdown Flink Streaming Pipeline when reaching +Inf watermark Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c83ffe0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c83ffe0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c83ffe0 Branch: refs/heads/master Commit: 9c83ffe0cdc6636d2187bf9439a73a3b45756d50 Parents: caecac3 Author: Aljoscha Krettek <[email protected]> Authored: Mon Jun 5 12:19:00 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Jun 7 23:13:52 2017 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/io/UnboundedSourceWrapper.java | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9c83ffe0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 6055a43..e75072a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -31,6 +31,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; @@ -436,6 +437,10 @@ public class UnboundedSourceWrapper< } } context.emitWatermark(new Watermark(watermarkMillis)); + + if (watermarkMillis >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + this.isRunning = false; + } } setNextWatermarkTimer(this.runtimeContext); }
