Aitozi created FLINK-31205:
------------------------------

             Summary: do optimize for multi sink in a single relNode tree 
                 Key: FLINK-31205
                 URL: https://issues.apache.org/jira/browse/FLINK-31205
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Aitozi


Flink supports multi sink usage, but it optimize the each sink in a individual 
RelNode tree, this will miss some opportunity to do some cross tree 
optimization, eg: 


{code:java}
create table newX(
  a int,
  b bigint,
  c varchar,
  d varchar,
  e varchar
) with (
  'connector' = 'values'
  ,'enable-projection-push-down' = 'true'


insert into sink_table select a, b from newX
insert into sink_table select a, 1 from newX
{code}

It will produce the plan as below, this will cause the source be consumed twice


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- TableSourceScan(table=[[default_catalog, default_database, newX, project=[a, 
b], metadata=[]]], fields=[a, b])

Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Calc(select=[a, 1 AS b])
   +- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a], metadata=[]]], fields=[a])

{code}

In this ticket, I propose to do a global optimization for the multi sink by 
* Megre the multi sink(with same table) into a single relNode tree with an 
extra union node
* After optimization, split the merged union back to the original multi sink

In my poc, after step 1, it will produce the plan as below, I think it will do 
good for the global performacne


{code:java}
Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
+- Union(all=[true], union=[a, b])
   :- TableSourceScan(table=[[default_catalog, default_database, newX, 
project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
   +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
      +- Reused(reference_id=[1])
{code}






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to