[ https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17694386#comment-17694386 ]
Aitozi commented on FLINK-31205: -------------------------------- looking forward to your opinion CC [~godfreyhe] [~twalthr] [~snuyanzin] > do optimize for multi sink in a single relNode tree > ---------------------------------------------------- > > Key: FLINK-31205 > URL: https://issues.apache.org/jira/browse/FLINK-31205 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: Aitozi > Priority: Major > > Flink supports multi sink usage, but it optimize the each sink in a > individual RelNode tree, this will miss some opportunity to do some cross > tree optimization, eg: > {code:java} > create table newX( > a int, > b bigint, > c varchar, > d varchar, > e varchar > ) with ( > 'connector' = 'values' > ,'enable-projection-push-down' = 'true' > insert into sink_table select a, b from newX > insert into sink_table select a, 1 from newX > {code} > It will produce the plan as below, this will cause the source be consumed > twice > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b]) > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Calc(select=[a, 1 AS b]) > +- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a], metadata=[]]], fields=[a]) > {code} > In this ticket, I propose to do a global optimization for the multi sink by > * Megre the multi sink(with same table) into a single relNode tree with an > extra union node > * After optimization, split the merged union back to the original multi sink > In my poc, after step 1, it will produce the plan as below, I think it will > do good for the global performacne > {code:java} > Sink(table=[default_catalog.default_database.sink_table], fields=[a, b]) > +- Union(all=[true], union=[a, b]) > :- TableSourceScan(table=[[default_catalog, default_database, newX, > project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1]) > +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1]) > +- Reused(reference_id=[1]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)