[ 
https://issues.apache.org/jira/browse/FLINK-20231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20231:
-----------------------------------
    Labels: auto-deprioritized-major stale-minor  (was: 
auto-deprioritized-major)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Sql UDTF subplan reuse on correlate
> -----------------------------------
>
>                 Key: FLINK-20231
>                 URL: https://issues.apache.org/jira/browse/FLINK-20231
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0, 1.10.1
>            Reporter: Junning Liang
>            Priority: Minor
>              Labels: auto-deprioritized-major, stale-minor
>
> Hi all,
>  
> I would like to start a discussion for subplan reuse on correlate.
> when I wrote a test case for the UDTF with two sinks, I saw the relnode 
> digest didn't reuse any except TableSourceScan. Code show as below.
> {code:java}
> CREATE VIEW tempTable1 as SELECT name, age, habit, length FROM sources , 
> LATERAL TABLE(SplitStringUDTF(habits)) as T(habit, length);
> INSERT INTO sinks SELECT * FROM tempTable1;
> INSERT INTO sinks1 SELECT * FROM tempTable1;
> {code}
> And two sinks relnode digest as below.
> {code:java}
> Sink(name=[`default_catalog`.`default_database`.`sinks`], fields=[name, age, 
> habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN 
> f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habit, INTEGER length) f1)] 
>    Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), 
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habit, INTEGER length)] 
>       Correlate(invocation=[SplitStringUDTF($cor1.habits)], 
> correlate=[table(default_catalog.default_database.SplitStringUDTF($cor1.habits))],
>  select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) 
> name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, 
> INTEGER f1)], joinType=[INNER], accMode=[Acc]), 
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]   
>           TableSourceScan(table=[[default_catalog, default_database, sources, 
> source: [HDFSTbleSource(name, age, habits)]]], fields=[name, age, habits], 
> accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habits)]{code}
> {code:java}
> // code placeholder
> Sink(name=[`default_catalog`.`default_database`.`sinks1`], fields=[name, age, 
> habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN 
> f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habit, INTEGER length) f1)]
>    Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]), 
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habit, INTEGER length)]
>       Correlate(invocation=[SplitStringUDTF($cor2.habits)], 
> correlate=[table(default_catalog.default_database.SplitStringUDTF($cor2.habits))],
>  select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) 
> name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, 
> INTEGER f1)], joinType=[INNER], accMode=[Acc]), 
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
>           TableSourceScan(table=[[default_catalog, default_database, sources, 
> source: [HDFSTableSource(name, age, habits)]]], fields=[name, age, habits], 
> accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, 
> VARCHAR(2147483647) habits)]
> {code}
> As we see, only TableSourceScan plan was reused. And I found related tests in 
> SubplanReuseTest.scala.but it would todo since 2019.
> I wish some solutions have been proposed.
> {code:java}
> // code placeholder
> @Test def testSubplanReuseOnCorrelate(): Unit = { 
>    util.addFunction("str_split", new StringSplit())
>    val sqlQuery = 
>      """ 
>        |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) 
> AS T(v)) 
>        |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v 
>      """.stripMargin 
>    // TODO the sub-plan of Correlate should be reused, 
>    // however the digests of Correlates are different 
>    util.verifyPlan(sqlQuery) 
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to