[
https://issues.apache.org/jira/browse/FLINK-22099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andy updated FLINK-22099:
-------------------------
Description:
Fix bug for Semi/Anti WindowJoin.
{code:java}
//代码占位符
@Test
def testSemiJoinIN(): Unit = {
val sql =
"""
|SELECT * FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) L WHERE L.a IN (
|SELECT a FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) R
|WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
""".stripMargin
util.verifyRelPlan(sql)
}
@Test
def testSemiExist(): Unit = {
val sql =
"""
|SELECT * FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) L WHERE EXISTS (
|SELECT * FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) R
|WHERE L.window_start = R.window_start AND L.window_end = R.window_end
AND L.a = R.a)
""".stripMargin
util.verifyRelPlan(sql)
}
@Test
def testAntiJoinNotExist(): Unit = {
val sql =
"""
|SELECT * FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) L WHERE NOT EXISTS (
|SELECT * FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) R
|WHERE L.window_start = R.window_start AND L.window_end = R.window_end
AND L.a = R.a)
""".stripMargin
util.verifyRelPlan(sql)
}
@Test
def testAntiJoinNotIN(): Unit = {
val sql =
"""
|SELECT * FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) L WHERE L.a NOT IN (
|SELECT a FROM (
| SELECT
| a,
| window_start,
| window_end,
| window_time,
| count(*) as cnt,
| count(distinct c) AS uv
| FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
MINUTE))
| GROUP BY a, window_start, window_end, window_time
|) R
|WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
""".stripMargin
util.verifyRelPlan(sql)
}{code}
Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown out.
> Fix bug for semi/anti window join.
> ----------------------------------
>
> Key: FLINK-22099
> URL: https://issues.apache.org/jira/browse/FLINK-22099
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: Andy
> Assignee: Andy
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> Fix bug for Semi/Anti WindowJoin.
> {code:java}
> //代码占位符
> @Test
> def testSemiJoinIN(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE L.a IN (
> |SELECT a FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> @Test
> def testSemiExist(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE EXISTS (
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end
> AND L.a = R.a)
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> @Test
> def testAntiJoinNotExist(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE NOT EXISTS (
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end
> AND L.a = R.a)
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> @Test
> def testAntiJoinNotIN(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE L.a NOT IN (
> |SELECT a FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
> """.stripMargin
> util.verifyRelPlan(sql)
> }{code}
> Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown
> out.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)