[FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVER window.
This closes #4633. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f4a58d4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f4a58d4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f4a58d4 Branch: refs/heads/release-1.3 Commit: 7f4a58d4fe004d603e8bf2a9b3d19e443b104662 Parents: 3211e8a Author: Xingcan Cui <xingc...@gmail.com> Authored: Fri Sep 1 09:16:21 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Sep 5 15:49:50 2017 +0200 ---------------------------------------------------------------------- .../flink/table/runtime/aggregate/RowTimeUnboundedOver.scala | 2 +- .../flink/table/runtime/harness/OverWindowHarnessTest.scala | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7f4a58d4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala index d4b5d23a..51de372 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -112,7 +112,7 @@ abstract class RowTimeUnboundedOver( val curWatermark = ctx.timerService().currentWatermark() // discard late record - if (timestamp >= curWatermark) { + if (timestamp > curWatermark) { // ensure every key just registers one timer ctx.timerService.registerEventTimeTimer(curWatermark + 1) http://git-wip-us.apache.org/repos/asf/flink/blob/7f4a58d4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala index 8cad64f..7086bc6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala @@ -781,7 +781,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(20000) testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20000)) // test late data + testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000 + testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000 @@ -929,7 +932,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(20000) testHarness.processElement(new StreamRecord( + CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20000)) // test late data + testHarness.processElement(new StreamRecord( CRow(Row.of(1: JInt, 0L: JLong, 0: JInt, "ccc", 1L: JLong), true), 20001)) // clean-up 5000 + testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "ccc", 2L: JLong), true), 20002)) // clean-up 5000