[
https://issues.apache.org/jira/browse/FLINK-22099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-22099:
-------------------------------
Fix Version/s: 1.14.0
> Fix bug for semi/anti window join.
> ----------------------------------
>
> Key: FLINK-22099
> URL: https://issues.apache.org/jira/browse/FLINK-22099
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: Andy
> Assignee: Andy
> Priority: Minor
> Labels: pull-request-available
> Fix For: 1.13.0, 1.14.0
>
>
> Fix bug for Semi/Anti WindowJoin.
> {code:java}
> //代码占位符
> @Test
> def testSemiJoinIN(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE L.a IN (
> |SELECT a FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> @Test
> def testSemiExist(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE EXISTS (
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end
> AND L.a = R.a)
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> @Test
> def testAntiJoinNotExist(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE NOT EXISTS (
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end
> AND L.a = R.a)
> """.stripMargin
> util.verifyRelPlan(sql)
> }
> @Test
> def testAntiJoinNotIN(): Unit = {
> val sql =
> """
> |SELECT * FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) L WHERE L.a NOT IN (
> |SELECT a FROM (
> | SELECT
> | a,
> | window_start,
> | window_end,
> | window_time,
> | count(*) as cnt,
> | count(distinct c) AS uv
> | FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15'
> MINUTE))
> | GROUP BY a, window_start, window_end, window_time
> |) R
> |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)
> """.stripMargin
> util.verifyRelPlan(sql)
> }{code}
> Now run the above sql, an `ArrayIndexOutOfBoundsException` would be thrown
> out.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)