[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751957#comment-17751957
 ] 

Yunhong Zheng commented on FLINK-20255:
---------------------------------------

Hi [~lam167] . By design, your example cannot work. Now, 'UNNEST(xx) AS xx (xx) 
ON' can only with condition equals 'TRUE', like 'UNNEST(relatedUserIds) AS t 
(relatedUserId) ON TRUE'. Calcite will translate UNNEST as 'Correlate', which 
you can treat it as a nested loop join, so you cannot give it a not always true 
condition.

For your sql example, you can convert it into a filter condition and add it in 
where condition, such as
SELECT * 
FROM Messages 
CROSS JOIN UNNEST(relatedUserIds) AS t (relatedUserId) where userId = 
t.relatedUserId
 

> Nested decorrelate failed
> -------------------------
>
>                 Key: FLINK-20255
>                 URL: https://issues.apache.org/jira/browse/FLINK-20255
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: godfrey he
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> This issue is from ML 
> https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW<val STRING>"))
> class SplitStringToRows extends TableFunction[Row] {
>   def eval(str: String, separator: String = ";"): Unit = {
>     if (str != null) {
>       str.split(separator).foreach(s => collect(Row.of(s.trim())))
>     }
>   }
> }
> object Job {
>   def main(args: Array[String]): Unit = {
>     val settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>     val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>     val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
>     streamTableEnv.createTemporarySystemFunction(
>       "SplitStringToRows",
>       classOf[SplitStringToRows]
>     ) // Class defined in previous email
>     streamTableEnv.executeSql(
>       """
>       CREATE TABLE table2 (
>         attr1 STRING,
>         attr2 STRING,
>         attr3 DECIMAL,
>         attr4 DATE
>       ) WITH (
>        'connector' = 'datagen'
>        )""")
>     val q2 = streamTableEnv.sqlQuery(
>       """
>         SELECT
>           a.attr1 AS attr1,
>           attr2,
>           attr3,
>           attr4
>         FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
> a(attr1)
>     """)
>     streamTableEnv.createTemporaryView("view2", q2)
>     val q3 =
>       """
>         SELECT
>           w.attr1,
>           p.attr3
>         FROM table2 w
>         LEFT JOIN LATERAL (
>           SELECT
>             attr1,
>             attr3
>           FROM (
>             SELECT
>               attr1,
>               attr3,
>               ROW_NUMBER() OVER (
>                 PARTITION BY attr1
>                 ORDER BY
>                   attr4 DESC NULLS LAST,
>                   w.attr2 = attr2 DESC NULLS LAST
>               ) AS row_num
>           FROM view2)
>           WHERE row_num = 1) p
>         ON (w.attr1 = p.attr1)
>         """
>     println(streamTableEnv.explainSql(q3))
>   }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
> decorrelate pattern now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to