[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458614#comment-16458614
 ] 

ASF GitHub Bot commented on FLINK-8428:
---------------------------------------

Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5327#discussion_r184995939
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala
 ---
    @@ -977,6 +922,254 @@ class JoinITCase extends StreamingWithStateTestBase {
         expected.add("D,R-8,null")
         StreamITCase.compareWithList(expected)
       }
    +
    +  /** test non-window inner join **/
    +  @Test
    +  def testNonWindowInnerJoin(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.clear
    +
    +    val data1 = new mutable.MutableList[(Int, Long, String)]
    +    data1.+=((1, 1L, "Hi1"))
    +    data1.+=((1, 2L, "Hi2"))
    +    data1.+=((1, 2L, "Hi2"))
    +    data1.+=((1, 5L, "Hi3"))
    +    data1.+=((2, 7L, "Hi5"))
    +    data1.+=((1, 9L, "Hi6"))
    +    data1.+=((1, 8L, "Hi8"))
    +    data1.+=((3, 8L, "Hi9"))
    +
    +    val data2 = new mutable.MutableList[(Int, Long, String)]
    +    data2.+=((1, 1L, "HiHi"))
    +    data2.+=((2, 2L, "HeHe"))
    +    data2.+=((3, 2L, "HeHe"))
    +
    +    val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c)
    +      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
    +    val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c)
    +      .select(('a === 3) ? (Null(Types.INT), 'a) as 'a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +    tEnv.registerTable("T2", t2)
    +
    +    val sqlQuery =
    +      """
    +        |SELECT t2.a, t2.c, t1.c
    +        |FROM T1 as t1 JOIN T2 as t2 ON
    +        |  t1.a = t2.a AND
    +        |  t1.b > t2.b
    +        |""".stripMargin
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,HiHi,Hi2",
    +      "1,HiHi,Hi2",
    +      "1,HiHi,Hi3",
    +      "1,HiHi,Hi6",
    +      "1,HiHi,Hi8",
    +      "2,HeHe,Hi5",
    +      "null,HeHe,Hi9")
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoin(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e"
    +
    +    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo 
Welt")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoinWithFilter(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2"
    +
    +    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
'a, 'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("Hi,Hallo")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 
AND h < b"
    +
    +    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("Hello world, how are you?,Hallo Welt wie", "I am 
fine.,Hallo Welt wie")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoinWithMultipleKeys(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h"
    +
    +    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'h)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq(
    +      "Hi,Hallo", "Hello,Hallo Welt", "Hello world,Hallo Welt wie gehts?", 
"Hello world,ABC",
    +      "I am fine.,HIJ", "I am fine.,IJK")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testJoinWithAlias(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +    env.setStateBackend(getStateBackend)
    +
    +    val sqlQuery =
    +      "SELECT Table5.c, T.`1-_./Ü` FROM (SELECT a, b, c AS `1-_./Ü` FROM 
Table3) AS T, Table5 " +
    +        "WHERE a = d AND a < 4"
    +
    +    val ds1 = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 
'b, 'c)
    +    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'd, 
'e, 'f, 'g, 'c)
    +    tEnv.registerTable("Table3", ds1)
    +    tEnv.registerTable("Table5", ds2)
    +
    +    val result = tEnv.sqlQuery(sqlQuery)
    +
    +    val expected = Seq("1,Hi", "2,Hello", "1,Hello",
    +      "2,Hello world", "2,Hello world", "3,Hello world")
    +    val results = result.toRetractStream[Row]
    +    results.addSink(new StreamITCase.RetractingSink)
    +    env.execute()
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testDataStreamJoinWithAggregation(): Unit = {
    --- End diff --
    
    All names have been renamed, both stream and batch tests.


> Implement stream-stream non-window left outer join
> --------------------------------------------------
>
>                 Key: FLINK-8428
>                 URL: https://issues.apache.org/jira/browse/FLINK-8428
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to