Repository: flink Updated Branches: refs/heads/release-0.10 7c4cde3a6 -> 42bef80a8
[FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour Previously the internal currentWatermark would be updated even if the value returned from getCurrentWatermark was lower than the current watermark. This can lead to problems with chaining because the watermark is directly forwarded without going through the watermark logic that ensures correct behaviour (monotonically increasing). This adds a test that verifies that the timestamp extractor does not emit decreasing watermarks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42bef80a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42bef80a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42bef80a Branch: refs/heads/release-0.10 Commit: 42bef80a857ff37a1b06429374bc8647fa5fac1a Parents: 7c4cde3 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Nov 17 11:40:22 2015 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Nov 17 14:38:22 2015 +0100 ---------------------------------------------------------------------- .../operators/ExtractTimestampsOperator.java | 6 +- .../streaming/timestamp/TimestampITCase.java | 76 ++++++++++++++++++++ 2 files changed, 79 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/42bef80a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java index 6e51a49..9c27c6d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java @@ -79,10 +79,10 @@ public class ExtractTimestampsOperator<T> public void trigger(long timestamp) throws Exception { // register next timer registerTimer(System.currentTimeMillis() + watermarkInterval, this); - long lastWatermark = currentWatermark; - currentWatermark = userFunction.getCurrentWatermark(); + long newWatermark = userFunction.getCurrentWatermark(); - if (currentWatermark > lastWatermark) { + if (newWatermark > currentWatermark) { + currentWatermark = newWatermark; // emit watermark output.emitWatermark(new Watermark(currentWatermark)); } http://git-wip-us.apache.org/repos/asf/flink/blob/42bef80a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java index 749e1dd..5113b45 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -379,6 +380,73 @@ public class TimestampITCase { } /** + * This test verifies that the timestamp extractor does not emit decreasing watermarks even + * + */ + @Test + public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception { + final int NUM_ELEMENTS = 10; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort()); + env.setParallelism(1); + env.getConfig().disableSysoutLogging(); + env.getConfig().enableTimestamps(); + env.getConfig().setAutoWatermarkInterval(1); + + + DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() { + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + int index = 0; + while (index < NUM_ELEMENTS) { + ctx.collect(index); + Thread.sleep(100); + ctx.collect(index - 1); + latch.await(); + index++; + } + } + + @Override + public void cancel() { + + } + }); + + source1.assignTimestamps(new TimestampExtractor<Integer>() { + @Override + public long extractTimestamp(Integer element, long currentTimestamp) { + return element; + } + + @Override + public long extractWatermark(Integer element, long currentTimestamp) { + return element - 1; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } + }) + .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator()) + .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator()); + + + env.execute(); + + // verify that we get NUM_ELEMENTS watermarks + for (int j = 0; j < NUM_ELEMENTS; j++) { + if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) { + Assert.fail("Wrong watermark."); + } + } + if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) { + Assert.fail("Wrong watermark."); + } + } + + /** * This tests whether the program throws an exception when an event-time source tries * to emit without timestamp. */ @@ -442,6 +510,10 @@ public class TimestampITCase { public static List<Watermark>[] finalWatermarks = new List[PARALLELISM]; private long oldTimestamp; + public CustomOperator() { + setChainingStrategy(ChainingStrategy.ALWAYS); + } + @Override public void processElement(StreamRecord<Integer> element) throws Exception { if (element.getTimestamp() != element.getValue()) { @@ -473,6 +545,10 @@ public class TimestampITCase { public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> { + public TimestampCheckingOperator() { + setChainingStrategy(ChainingStrategy.ALWAYS); + } + @Override public void processElement(StreamRecord<Integer> element) throws Exception { if (element.getTimestamp() != element.getValue()) {