Repository: beam Updated Branches: refs/heads/master c33cb0340 -> 20d88dbfc
Fix min_timestamp used for KafkaIO watermark. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3362d1f5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3362d1f5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3362d1f5 Branch: refs/heads/master Commit: 3362d1f52bd2076908d74ff6643a483468630502 Parents: c33cb03 Author: Raghu Angadi <[email protected]> Authored: Thu Aug 24 14:33:28 2017 -0700 Committer: [email protected] <[email protected]> Committed: Thu Aug 24 17:48:12 2017 -0700 ---------------------------------------------------------------------- .../kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3362d1f5/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7fb4260..dae4c1d 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -82,6 +82,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -899,7 +900,7 @@ public class KafkaIO { private transient ConsumerSpEL consumerSpEL; /** watermark before any records have been read. */ - private static Instant initialWatermark = new Instant(Long.MIN_VALUE); + private static Instant initialWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; @Override public String toString() {
