[
https://issues.apache.org/jira/browse/FLINK-29138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jingsong Lee closed FLINK-29138.
--------------------------------
Resolution: Fixed
master: 2e2fb24bdbc69c9d85883081b0c29f9db254088e
release-1.15: e7c7df4c9b07667344e33c23bb92cb8a07e3ac0b
release-1.14: 54b6b69941533b756dbd4f68b43d8d4118a8c4e5
> Project pushdown not work for lookup source
> -------------------------------------------
>
> Key: FLINK-29138
> URL: https://issues.apache.org/jira/browse/FLINK-29138
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: lincoln lee
> Assignee: lincoln lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0, 1.14.6, 1.15.3
>
> Attachments: image-2022-08-30-20-33-24-105.png
>
>
> Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown
> {code:java}
> @Test
> def testJoinTemporalTableWithProjectionPushDown(): Unit = {
> val sql =
> """
> |SELECT T.*, D.id
> |FROM MyTable AS T
> |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
> |ON T.a = D.id
> """.stripMargin
> util.verifyExecPlan(sql)
> }
> {code}
> the optimized plan doesn't print the selected columns from lookup source, but
> actually it didn't push the project into lookup source (still select all
> columns from source), this is not as expected
> {code:java}
> <Resource name="optimized exec plan">
> <![CDATA[
> Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime,
> id])
> +- LookupJoin(table=[default_catalog.default_database.LookupTable],
> joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])
> +- DataStreamScan(table=[[default_catalog, default_database, MyTable]],
> fields=[a, b, c, proctime, rowtime])
> ]]>
> </Resource>
> {code}
>
> incorrect intermediate optimization result
> {code:java}
> ========= logical_rewrite ========
> optimize result:
> FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
> :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database,
> MyTable]], fields=[a, b, c, proctime, rowtime])
> +- FlinkLogicalSnapshot(period=[$cor0.proctime])
> +- FlinkLogicalCalc(select=[id])
> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, LookupTable]], fields=[id, name, age])
> ========= time_indicator ========
> optimize result:
> FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime,
> rowtime, id])
> +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
> :- FlinkLogicalDataStreamTableScan(table=[[default_catalog,
> default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
> +- FlinkLogicalSnapshot(period=[$cor0.proctime])
> +- FlinkLogicalCalc(select=[id])
> +- FlinkLogicalTableSourceScan(table=[[default_catalog,
> default_database, LookupTable]], fields=[id, name, age])
> {code}
>
> plan comparison after fix
> !image-2022-08-30-20-33-24-105.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)