[FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVER window.

This closes #4633.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f4a58d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f4a58d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f4a58d4

Branch: refs/heads/release-1.3
Commit: 7f4a58d4fe004d603e8bf2a9b3d19e443b104662
Parents: 3211e8a
Author: Xingcan Cui <xingc...@gmail.com>
Authored: Fri Sep 1 09:16:21 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Sep 5 15:49:50 2017 +0200

----------------------------------------------------------------------
 .../flink/table/runtime/aggregate/RowTimeUnboundedOver.scala   | 2 +-
 .../flink/table/runtime/harness/OverWindowHarnessTest.scala    | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f4a58d4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index d4b5d23a..51de372 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -112,7 +112,7 @@ abstract class RowTimeUnboundedOver(
     val curWatermark = ctx.timerService().currentWatermark()
 
     // discard late record
-    if (timestamp >= curWatermark) {
+    if (timestamp > curWatermark) {
       // ensure every key just registers one timer
       ctx.timerService.registerEventTimeTimer(curWatermark + 1)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7f4a58d4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index 8cad64f..7086bc6 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -781,7 +781,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20000)) // test late data
+    testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 5000
+
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 5000
@@ -929,7 +932,10 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
+      CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20000)) // test late data
+    testHarness.processElement(new StreamRecord(
       CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 
20001)) // clean-up 5000
+
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
       CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 
20002)) // clean-up 5000

Reply via email to