[
https://issues.apache.org/jira/browse/FLINK-20231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20231:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Sql UDTF subplan reuse on correlate
> -----------------------------------
>
> Key: FLINK-20231
> URL: https://issues.apache.org/jira/browse/FLINK-20231
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.10.0, 1.10.1
> Reporter: Junning Liang
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
>
> Hi all,
>
> I would like to start a discussion for subplan reuse on correlate.
> when I wrote a test case for the UDTF with two sinks, I saw the relnode
> digest didn't reuse any except TableSourceScan. Code show as below.
> {code:java}
> CREATE VIEW tempTable1 as SELECT name, age, habit, length FROM sources ,
> LATERAL TABLE(SplitStringUDTF(habits)) as T(habit, length);
> INSERT INTO sinks SELECT * FROM tempTable1;
> INSERT INTO sinks1 SELECT * FROM tempTable1;
> {code}
> And two sinks relnode digest as below.
> {code:java}
> Sink(name=[`default_catalog`.`default_database`.`sinks`], fields=[name, age,
> habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN
> f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habit, INTEGER length) f1)]
> Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]),
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habit, INTEGER length)]
> Correlate(invocation=[SplitStringUDTF($cor1.habits)],
> correlate=[table(default_catalog.default_database.SplitStringUDTF($cor1.habits))],
> select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647)
> name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0,
> INTEGER f1)], joinType=[INNER], accMode=[Acc]),
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
> TableSourceScan(table=[[default_catalog, default_database, sources,
> source: [HDFSTbleSource(name, age, habits)]]], fields=[name, age, habits],
> accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habits)]{code}
> {code:java}
> // code placeholder
> Sink(name=[`default_catalog`.`default_database`.`sinks1`], fields=[name, age,
> habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN
> f0, RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habit, INTEGER length) f1)]
> Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]),
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habit, INTEGER length)]
> Correlate(invocation=[SplitStringUDTF($cor2.habits)],
> correlate=[table(default_catalog.default_database.SplitStringUDTF($cor2.habits))],
> select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647)
> name, INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0,
> INTEGER f1)], joinType=[INNER], accMode=[Acc]),
> rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
> TableSourceScan(table=[[default_catalog, default_database, sources,
> source: [HDFSTableSource(name, age, habits)]]], fields=[name, age, habits],
> accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
> VARCHAR(2147483647) habits)]
> {code}
> As we see, only TableSourceScan plan was reused. And I found related tests in
> SubplanReuseTest.scala.but it would todo since 2019.
> I wish some solutions have been proposed.
> {code:java}
> // code placeholder
> @Test def testSubplanReuseOnCorrelate(): Unit = {
> util.addFunction("str_split", new StringSplit())
> val sqlQuery =
> """
> |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-'))
> AS T(v))
> |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
> """.stripMargin
> // TODO the sub-plan of Correlate should be reused,
> // however the digests of Correlates are different
> util.verifyPlan(sqlQuery)
> }
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)