[
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305857#comment-16305857
]
ASF GitHub Bot commented on FLINK-7797:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5140#discussion_r159014072
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
---
@@ -461,6 +462,302 @@ class JoinITCase extends StreamingWithStateTestBase {
StreamITCase.compareWithList(expected)
}
+ // Tests for left outer join
+ @Test
+ def testProcTimeLeftOuterJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery =
+ """
+ |SELECT t2.a, t2.c, t1.c
+ |FROM T1 as t1 LEFT OUTER JOIN T2 as t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+ | t2.proctime + INTERVAL '3' SECOND
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(Int, Long, String)]
+ data1.+=((1, 1L, "Hi1"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 5L, "Hi3"))
+ data1.+=((2, 7L, "Hi5"))
+
+ val data2 = new mutable.MutableList[(Int, Long, String)]
+ data2.+=((1, 1L, "HiHi"))
+ data2.+=((2, 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c,
'proctime.proctime)
+ .select('a, 'b, 'c, 'proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c,
'proctime.proctime)
+ .select('a, 'b, 'c, 'proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ @Test
+ def testRowTimeLeftOuterJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ StreamITCase.clear
+
+ val sqlQuery =
+ """
+ |SELECT t2.key, t2.id, t1.id
+ |FROM T1 AS t1 LEFT OUTER JOIN T2 AS t2 ON
+ | t1.key = t2.key AND
+ | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+ | t2.rt + INTERVAL '6' SECOND
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(String, String, Long)]
+ // for boundary test
+ data1.+=(("A", "L-1", 1000L))
+ data1.+=(("A", "L-2", 2000L))
+ data1.+=(("B", "L-4", 4000L))
+ data1.+=(("A", "L-6", 6000L))
+ data1.+=(("C", "L-7", 7000L))
+ data1.+=(("A", "L-10", 10000L))
+ data1.+=(("A", "L-12", 12000L))
+ data1.+=(("A", "L-20", 20000L))
+
+ val data2 = new mutable.MutableList[(String, String, Long)]
+ data2.+=(("A", "R-6", 6000L))
+ data2.+=(("B", "R-7", 7000L))
+ data2.+=(("D", "R-8", 8000L))
+
+ val t1 = env.fromCollection(data1)
+ .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+ .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+ val t2 = env.fromCollection(data2)
+ .assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
+ .toTable(tEnv, 'key, 'id, 'rt.rowtime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = new java.util.ArrayList[String]
+ expected.add("A,R-6,L-1")
+ expected.add("A,R-6,L-2")
+ expected.add("A,R-6,L-6")
+ expected.add("A,R-6,L-10")
+ expected.add("A,R-6,L-12")
+ expected.add("B,R-7,L-4")
+ expected.add("null,null,L-7")
+ expected.add("null,null,L-20")
+ StreamITCase.compareWithList(expected)
+ }
+
+ // Tests for right outer join
+ @Test
+ def testProcTimeRightOuterJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery =
+ """
+ |SELECT t2.a, t2.c, t1.c
+ |FROM T1 as t1 RIGHT OUTER JOIN T2 as t2 ON
+ | t1.a = t2.a AND
+ | t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
+ | t2.proctime + INTERVAL '3' SECOND
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(Int, Long, String)]
+ data1.+=((1, 1L, "Hi1"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 5L, "Hi3"))
+ data1.+=((2, 7L, "Hi5"))
+
+ val data2 = new mutable.MutableList[(Int, Long, String)]
+ data2.+=((1, 1L, "HiHi"))
+ data2.+=((2, 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c,
'proctime.proctime)
+ .select('a, 'b, 'c, 'proctime)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c,
'proctime.proctime)
+ .select('a, 'b, 'c, 'proctime)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ }
+
+ @Test
+ def testRowTimeRightOuterJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ StreamITCase.clear
+
+ val sqlQuery =
+ """
+ |SELECT t2.key, t2.id, t1.id
+ |FROM T1 AS t1 RIGHT OUTER JOIN T2 AS t2 ON
+ | t1.key = t2.key AND
+ | t1.rt BETWEEN t2.rt - INTERVAL '5' SECOND AND
+ | t2.rt + INTERVAL '6' SECOND
+ |""".stripMargin
+
+ val data1 = new mutable.MutableList[(String, String, Long)]
+ // for boundary test
+ data1.+=(("A", "L-1", 1000L))
+ data1.+=(("A", "L-2", 2000L))
+ data1.+=(("B", "L-4", 4000L))
+ data1.+=(("A", "L-6", 6000L))
+ data1.+=(("C", "L-7", 7000L))
+ data1.+=(("A", "L-10", 10000L))
+ data1.+=(("A", "L-12", 12000L))
+
+ val data2 = new mutable.MutableList[(String, String, Long)]
--- End diff --
Add a row on key A that does not join with left.
> Add support for windowed outer joins for streaming tables
> ---------------------------------------------------------
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: Fabian Hueske
> Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER
> joins.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)