[
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758789#comment-17758789
]
dalongliu commented on FLINK-32940:
-----------------------------------
[~vsowrirajan] Your attempt look goods to me
> Support projection pushdown to table source for column projections through
> UDTF
> -------------------------------------------------------------------------------
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Venkata krishnan Sowrirajan
> Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than
> only _name_ and {_}employees{_}. If the table source supports nested fields
> column projection, ideally it should project only _t1.employees.ename_ from
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{3}])
> :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
> +- Uncollect
> +- LogicalProject(employees=[$cor1.employees])
> +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)],
> correlate=[table($UNNEST_ROWS$1($cor1.employees))],
> select=[deptno,name,skillrecord,employees,empno,ename,skills],
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name,
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647)
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno,
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647)
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647)
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT
> empno, VARCHAR(2147483647) ename,
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc,
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) ARRAY skills)], joinType=[INNER])
> +- TableSourceScan(table=[[hive_catalog, db, dept_nested]],
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)],
> correlate=[table($UNNEST_ROWS$1($cor1.employees))],
> select=[deptno,name,skillrecord,employees,empno,ename,skills],
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name,
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647)
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno,
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647)
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647)
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT
> empno, VARCHAR(2147483647) ename,
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc,
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) ARRAY skills)], joinType=[INNER])
> +- TableSourceScan(table=[[hive_catalog, db, dept_nested]],
> fields=[deptno, name, skillrecord, employees]) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)