Github user hequn8128 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5327#discussion_r184995939
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
---
@@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
expected.add("D,R-8,null")
StreamITCase.compareWithList(expected)
}
+
+ /** test non-window inner join **/
+ @Test
+ def testNonWindowInnerJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+
+ val data1 = new mutable.MutableList[(Int, Long, String)]
+ data1.+=((1, 1L, "Hi1"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 2L, "Hi2"))
+ data1.+=((1, 5L, "Hi3"))
+ data1.+=((2, 7L, "Hi5"))
+ data1.+=((1, 9L, "Hi6"))
+ data1.+=((1, 8L, "Hi8"))
+ data1.+=((3, 8L, "Hi9"))
+
+ val data2 = new mutable.MutableList[(Int, Long, String)]
+ data2.+=((1, 1L, "HiHi"))
+ data2.+=((2, 2L, "HeHe"))
+ data2.+=((3, 2L, "HeHe"))
+
+ val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
+ .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+ val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
+ .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+ tEnv.registerTable("T2", t2)
+
+ val sqlQuery =
+ """
+ |SELECT t2.a, t2.c, t1.c
+ |FROM T1 as t1 JOIN T2 as t2 ON
+ | t1.a = t2.a AND
+ | t1.b > t2.b
+ |""".stripMargin
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,HiHi,Hi2",
+ "1,HiHi,Hi2",
+ "1,HiHi,Hi3",
+ "1,HiHi,Hi6",
+ "1,HiHi,Hi8",
+ "2,HeHe,Hi5",
+ "null,HeHe,Hi9")
+
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+ env.setStateBackend(getStateBackend)
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
+
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv,
'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd,
'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo
Welt")
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testJoinWithFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+ env.setStateBackend(getStateBackend)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
+
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv,
'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd,
'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = Seq("Hi,Hallo")
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+ env.setStateBackend(getStateBackend)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6
AND h < b"
+
+ val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a,
'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd,
'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = Seq("Hello world, how are you?,Hallo Welt wie", "I am
fine.,Hallo Welt wie")
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testJoinWithMultipleKeys(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+ env.setStateBackend(getStateBackend)
+
+ val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
+
+ val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a,
'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd,
'e, 'f, 'g, 'h)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = Seq(
+ "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?",
"Hello world,ABC",
+ "I am fine.,HIJ", "I am fine.,IJK")
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testJoinWithAlias(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+ env.setStateBackend(getStateBackend)
+
+ val sqlQuery =
+ "SELECT Table5.c, T.`1-_./Ã` FROM (SELECT a, b, c AS `1-_./Ã` FROM
Table3) AS T, Table5 " +
+ "WHERE a = d AND a < 4"
+
+ val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a,
'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd,
'e, 'f, 'g, 'c)
+ tEnv.registerTable("Table3", ds1)
+ tEnv.registerTable("Table5", ds2)
+
+ val result = tEnv.sqlQuery(sqlQuery)
+
+ val expected = Seq("1,Hi", "2,Hello", "1,Hello",
+ "2,Hello world", "2,Hello world", "3,Hello world")
+ val results = result.toRetractStream[Row]
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testDataStreamJoinWithAggregation(): Unit = {
--- End diff --
All names have been renamed, both stream and batch tests.
---