[
https://issues.apache.org/jira/browse/FLINK-32940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17758824#comment-17758824
]
Yunhong Zheng commented on FLINK-32940:
---------------------------------------
Hi, [~vsowrirajan] . I think your idea is good, but the biggest problem
currently is how to pass the column cropping condition of
LogicalTableFunctionScan to LogicalTableScan and rewrite
LogicalTableFunctionScan. So I think we need to add a rule in project_rewrite
stage.
# Actually, first we need add rule CoreRules.ProjectCorrelateTransposeRule in
FlinkBatchRuleSets to push project into LogicalCorrelate.
# And we need add a rule in project_rewrite stage to pass by this project into
LogicalTableScan side and rewrite LogicalTableFunctionScan.
# For this npe problem, you can add if logical to avoid it.
Adding one example to explain step: 2
for this ddl
{code:java}
String ddl =
"CREATE TABLE NestedItemTable1 (\n"
+ " `deptno` INT,\n"
+ " `employees` MAP<varchar, varchar>\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'nested-projection-supported' = 'true',"
+ " 'bounded' = 'true'\n"
+ ")";
util.tableEnv().executeSql(ddl);
util.verifyRelPlan(
"SELECT t1.deptno, k FROM NestedItemTable1 t1, UNNEST(t1.employees) as
f(k, v)");{code}
we will get the below plan after add CoreRules.ProjectCorrelateTransposeRule:
{code:java}
optimize project_rewrite cost 413675 ms.
optimize result:
LogicalProject(inputs=[0], exprs=[[$2]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{1}])
:- LogicalTableScan(table=[[default_catalog, default_database,
NestedItemTable1]])
+- LogicalProject(inputs=[0])
+- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, VARCHAR(2147483647)
f1)]){code}
I think for this pattern, we need add a new rule to match this.:
{code:java}
+- LogicalCorrelate
:- LogicalTableScan
+- LogicalProject
+- LogicalTableFunctionScan{code}
In this rule, we first need to create a new LogicalTableFunctionScan after
merge LogicalProject and LogicalTableFunctionScan.
second, we need add a new LogicalProject for LogicalTableScan, which will be
push down to LogicalTableScan in logical stage.
IMO, the new plan after match this rule will be (just an example, not correct
plan):
{code:java}
LogicalProject(inputs=[0], exprs=[[$2]])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{1}])
:- LogicalProject(inputs=[employees.k])
+- LogicalTableScan(table=[[default_catalog, default_database,
NestedItemTable1]])
:- LogicalTableFunctionScan(invocation=[$UNNEST_ROWS$1($cor0.employees.k)],
rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)]){code}
WDYT? [~vsowrirajan]. Once the solution is determined and u complete the
development, you can ping me to review it.
> Support projection pushdown to table source for column projections through
> UDTF
> -------------------------------------------------------------------------------
>
> Key: FLINK-32940
> URL: https://issues.apache.org/jira/browse/FLINK-32940
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Venkata krishnan Sowrirajan
> Priority: Major
>
> Currently, Flink doesn't push down columns projected through UDTF like
> _UNNEST_ to the table source.
> For eg:
> {code:java}
> SELECT t1.deptno, t2.ename FROM db.dept_nested t1, UNNEST(t1.employees) AS
> t2{code}
> For the above SQL, Flink projects all the columns for DEPT_NESTED rather than
> only _name_ and {_}employees{_}. If the table source supports nested fields
> column projection, ideally it should project only _t1.employees.ename_ from
> the table source.
> Query plan:
> {code:java}
> == Abstract Syntax Tree ==
> LogicalProject(deptno=[$0], ename=[$5])
> +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{3}])
> :- LogicalTableScan(table=[[hive_catalog, db, dept_nested]])
> +- Uncollect
> +- LogicalProject(employees=[$cor1.employees])
> +- LogicalValues(tuples=[[{ 0 }]]){code}
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)],
> correlate=[table($UNNEST_ROWS$1($cor1.employees))],
> select=[deptno,name,skillrecord,employees,empno,ename,skills],
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name,
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647)
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno,
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647)
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647)
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT
> empno, VARCHAR(2147483647) ename,
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc,
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) ARRAY skills)], joinType=[INNER])
> +- TableSourceScan(table=[[hive_catalog, db, dept_nested]],
> fields=[deptno, name, skillrecord, employees]){code}
> {code:java}
> == Optimized Execution Plan ==
> Calc(select=[deptno, ename])
> +- Correlate(invocation=[$UNNEST_ROWS$1($cor1.employees)],
> correlate=[table($UNNEST_ROWS$1($cor1.employees))],
> select=[deptno,name,skillrecord,employees,empno,ename,skills],
> rowType=[RecordType(BIGINT deptno, VARCHAR(2147483647) name,
> RecordType:peek_no_expand(VARCHAR(2147483647) skilltype, VARCHAR(2147483647)
> desc, RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) skillrecord, RecordType:peek_no_expand(BIGINT empno,
> VARCHAR(2147483647) ename, RecordType:peek_no_expand(VARCHAR(2147483647)
> type, VARCHAR(2147483647) desc, RecordType:peek_no_expand(VARCHAR(2147483647)
> a, VARCHAR(2147483647) b) others) ARRAY skills) ARRAY employees, BIGINT
> empno, VARCHAR(2147483647) ename,
> RecordType:peek_no_expand(VARCHAR(2147483647) type, VARCHAR(2147483647) desc,
> RecordType:peek_no_expand(VARCHAR(2147483647) a, VARCHAR(2147483647) b)
> others) ARRAY skills)], joinType=[INNER])
> +- TableSourceScan(table=[[hive_catalog, db, dept_nested]],
> fields=[deptno, name, skillrecord, employees]) {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)