[FLINK-8002] [table] Fix join window boundary for LESS_THAN and GREATER_THAN 
predicates.

This closes #4962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c7943291
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c7943291
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c7943291

Branch: refs/heads/release-1.4
Commit: c7943291599260003304f003e89725352ae7d836
Parents: 51657fc
Author: Fabian Hueske <fhue...@apache.org>
Authored: Mon Nov 6 21:22:35 2017 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Nov 8 18:44:06 2017 +0100

----------------------------------------------------------------------
 .../table/runtime/join/WindowJoinUtil.scala     |  8 +++--
 .../flink/table/api/stream/sql/JoinTest.scala   | 38 ++++++++++++++++++--
 2 files changed, 42 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 1693c41..7006476 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -346,10 +346,14 @@ object WindowJoinUtil {
       leftLiteral.get - rightLiteral.get
     }
     val boundary = timePred.pred.getKind match {
-      case SqlKind.LESS_THAN =>
+      case SqlKind.LESS_THAN if timePred.leftInputOnLeftSide =>
         tmpTimeOffset - 1
-      case SqlKind.GREATER_THAN =>
+      case SqlKind.LESS_THAN if !timePred.leftInputOnLeftSide =>
         tmpTimeOffset + 1
+      case SqlKind.GREATER_THAN if timePred.leftInputOnLeftSide =>
+        tmpTimeOffset + 1
+      case SqlKind.GREATER_THAN if !timePred.leftInputOnLeftSide =>
+        tmpTimeOffset - 1
       case _ =>
         tmpTimeOffset
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/c7943291/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index ded5c51..8c1865c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -379,14 +379,27 @@ class JoinTest extends TableTestBase {
       "rowtime")
 
     verifyTimeBoundary(
-      "t1.c - interval '2' second >= t2.c + interval '1' second -" +
-        "interval '10' second and " +
+      "t1.c >= t2.c - interval '1' second and " +
+        "t1.c <= t2.c + interval '10' second",
+      -1000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c - interval '2' second >= t2.c + interval '1' second - interval 
'10' second and " +
         "t1.c <= t2.c + interval '10' second",
       -7000,
       10000,
       "rowtime")
 
     verifyTimeBoundary(
+      "t2.c + interval '1' second - interval '10' second <= t1.c - interval 
'2' second and " +
+        "t2.c + interval '10' second >= t1.c",
+      -7000,
+      10000,
+      "rowtime")
+
+    verifyTimeBoundary(
       "t1.c >= t2.c - interval '10' second and " +
         "t1.c <= t2.c - interval '5' second",
       -10000,
@@ -394,6 +407,27 @@ class JoinTest extends TableTestBase {
       "rowtime")
 
     verifyTimeBoundary(
+      "t2.c - interval '10' second <= t1.c and " +
+        "t2.c - interval '5' second >= t1.c",
+      -10000,
+      -5000,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t1.c > t2.c - interval '2' second and " +
+        "t1.c < t2.c + interval '2' second",
+      -1999,
+      1999,
+      "rowtime")
+
+    verifyTimeBoundary(
+      "t2.c > t1.c - interval '2' second and " +
+        "t2.c < t1.c + interval '2' second",
+      -1999,
+      1999,
+      "rowtime")
+
+    verifyTimeBoundary(
       "t1.c = t2.c",
       0,
       0,

Reply via email to