[FLINK-8002] [table] Fix join window boundary for LESS_THAN and GREATER_THAN predicates.
This closes #4962. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7943291 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7943291 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7943291 Branch: refs/heads/release-1.4 Commit: c7943291599260003304f003e89725352ae7d836 Parents: 51657fc Author: Fabian Hueske <fhue...@apache.org> Authored: Mon Nov 6 21:22:35 2017 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Nov 8 18:44:06 2017 +0100 ---------------------------------------------------------------------- .../table/runtime/join/WindowJoinUtil.scala | 8 +++-- .../flink/table/api/stream/sql/JoinTest.scala | 38 ++++++++++++++++++-- 2 files changed, 42 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 1693c41..7006476 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -346,10 +346,14 @@ object WindowJoinUtil { leftLiteral.get - rightLiteral.get } val boundary = timePred.pred.getKind match { - case SqlKind.LESS_THAN => + case SqlKind.LESS_THAN if timePred.leftInputOnLeftSide => tmpTimeOffset - 1 - case SqlKind.GREATER_THAN => + case SqlKind.LESS_THAN if !timePred.leftInputOnLeftSide => tmpTimeOffset + 1 + case SqlKind.GREATER_THAN if timePred.leftInputOnLeftSide => + tmpTimeOffset + 1 + case SqlKind.GREATER_THAN if !timePred.leftInputOnLeftSide => + tmpTimeOffset - 1 case _ => tmpTimeOffset } http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index ded5c51..8c1865c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -379,14 +379,27 @@ class JoinTest extends TableTestBase { "rowtime") verifyTimeBoundary( - "t1.c - interval '2' second >= t2.c + interval '1' second -" + - "interval '10' second and " + + "t1.c >= t2.c - interval '1' second and " + + "t1.c <= t2.c + interval '10' second", + -1000, + 10000, + "rowtime") + + verifyTimeBoundary( + "t1.c - interval '2' second >= t2.c + interval '1' second - interval '10' second and " + "t1.c <= t2.c + interval '10' second", -7000, 10000, "rowtime") verifyTimeBoundary( + "t2.c + interval '1' second - interval '10' second <= t1.c - interval '2' second and " + + "t2.c + interval '10' second >= t1.c", + -7000, + 10000, + "rowtime") + + verifyTimeBoundary( "t1.c >= t2.c - interval '10' second and " + "t1.c <= t2.c - interval '5' second", -10000, @@ -394,6 +407,27 @@ class JoinTest extends TableTestBase { "rowtime") verifyTimeBoundary( + "t2.c - interval '10' second <= t1.c and " + + "t2.c - interval '5' second >= t1.c", + -10000, + -5000, + "rowtime") + + verifyTimeBoundary( + "t1.c > t2.c - interval '2' second and " + + "t1.c < t2.c + interval '2' second", + -1999, + 1999, + "rowtime") + + verifyTimeBoundary( + "t2.c > t1.c - interval '2' second and " + + "t2.c < t1.c + interval '2' second", + -1999, + 1999, + "rowtime") + + verifyTimeBoundary( "t1.c = t2.c", 0, 0,