Repository: flink
Updated Branches:
  refs/heads/release-0.10 7c4cde3a6 -> 42bef80a8


[FLINK-3024] Fix TimestampExtractor.getCurrentWatermark() Behaviour

Previously the internal currentWatermark would be updated even if the
value returned from getCurrentWatermark was lower than the current
watermark.

This can lead to problems with chaining because the watermark is
directly forwarded without going through the watermark logic that
ensures correct behaviour (monotonically increasing).

This adds a test that verifies that the timestamp extractor does not
emit decreasing watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/42bef80a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/42bef80a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/42bef80a

Branch: refs/heads/release-0.10
Commit: 42bef80a857ff37a1b06429374bc8647fa5fac1a
Parents: 7c4cde3
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Tue Nov 17 11:40:22 2015 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Nov 17 14:38:22 2015 +0100

----------------------------------------------------------------------
 .../operators/ExtractTimestampsOperator.java    |  6 +-
 .../streaming/timestamp/TimestampITCase.java    | 76 ++++++++++++++++++++
 2 files changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/42bef80a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 6e51a49..9c27c6d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -79,10 +79,10 @@ public class ExtractTimestampsOperator<T>
        public void trigger(long timestamp) throws Exception {
                // register next timer
                registerTimer(System.currentTimeMillis() + watermarkInterval, 
this);
-               long lastWatermark = currentWatermark;
-               currentWatermark = userFunction.getCurrentWatermark();
+               long newWatermark = userFunction.getCurrentWatermark();
 
-               if (currentWatermark > lastWatermark) {
+               if (newWatermark > currentWatermark) {
+                       currentWatermark = newWatermark;
                        // emit watermark
                        output.emitWatermark(new Watermark(currentWatermark));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/42bef80a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 749e1dd..5113b45 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -379,6 +380,73 @@ public class TimestampITCase {
        }
 
        /**
+        * This test verifies that the timestamp extractor does not emit 
decreasing watermarks even
+        *
+        */
+       @Test
+       public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() 
throws Exception {
+               final int NUM_ELEMENTS = 10;
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", 
cluster.getLeaderRPCPort());
+               env.setParallelism(1);
+               env.getConfig().disableSysoutLogging();
+               env.getConfig().enableTimestamps();
+               env.getConfig().setAutoWatermarkInterval(1);
+
+
+               DataStream<Integer> source1 = env.addSource(new 
SourceFunction<Integer>() {
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                               int index = 0;
+                               while (index < NUM_ELEMENTS) {
+                                       ctx.collect(index);
+                                       Thread.sleep(100);
+                                       ctx.collect(index - 1);
+                                       latch.await();
+                                       index++;
+                               }
+                       }
+
+                       @Override
+                       public void cancel() {
+
+                       }
+               });
+
+               source1.assignTimestamps(new TimestampExtractor<Integer>() {
+                       @Override
+                       public long extractTimestamp(Integer element, long 
currentTimestamp) {
+                               return element;
+                       }
+
+                       @Override
+                       public long extractWatermark(Integer element, long 
currentTimestamp) {
+                               return element - 1;
+                       }
+
+                       @Override
+                       public long getCurrentWatermark() {
+                               return Long.MIN_VALUE;
+                       }
+               })
+                               .transform("Watermark Check", 
BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+                               .transform("Timestamp Check", 
BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
+
+
+               env.execute();
+
+               // verify that we get NUM_ELEMENTS watermarks
+               for (int j = 0; j < NUM_ELEMENTS; j++) {
+                       if 
(!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
+                               Assert.fail("Wrong watermark.");
+                       }
+               }
+               if 
(!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new 
Watermark(Long.MAX_VALUE))) {
+                       Assert.fail("Wrong watermark.");
+               }
+       }
+
+       /**
         * This tests whether the program throws an exception when an 
event-time source tries
         * to emit without timestamp.
         */
@@ -442,6 +510,10 @@ public class TimestampITCase {
                public static List<Watermark>[] finalWatermarks = new 
List[PARALLELISM];
                private long oldTimestamp;
 
+               public CustomOperator() {
+                       setChainingStrategy(ChainingStrategy.ALWAYS);
+               }
+
                @Override
                public void processElement(StreamRecord<Integer> element) 
throws Exception {
                        if (element.getTimestamp() != element.getValue()) {
@@ -473,6 +545,10 @@ public class TimestampITCase {
 
        public static class TimestampCheckingOperator extends 
AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, 
Integer> {
 
+               public TimestampCheckingOperator() {
+                       setChainingStrategy(ChainingStrategy.ALWAYS);
+               }
+
                @Override
                public void processElement(StreamRecord<Integer> element) 
throws Exception {
                        if (element.getTimestamp() != element.getValue()) {

Reply via email to