[
https://issues.apache.org/jira/browse/FLINK-31205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17820259#comment-17820259
]
TJ Banghart commented on FLINK-31205:
-------------------------------------
I happened upon this ticket looking for issues related to multi-query
optimization. CALCITE-1440 looks like generalization of this problem. The
proposed {{Combine}} rel node looks similar to {{{}MultiSink{}}}.
Do you have a PR for this change? Would love to see it and hopefully get
something similar in Calcite.
> do optimize for multi sink in a single relNode tree
> ----------------------------------------------------
>
> Key: FLINK-31205
> URL: https://issues.apache.org/jira/browse/FLINK-31205
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: WenJun Min
> Priority: Major
>
> Flink supports multi sink usage, but it optimize the each sink in a
> individual RelNode tree, this will miss some opportunity to do some cross
> tree optimization, eg:
> {code:java}
> create table newX(
> a int,
> b bigint,
> c varchar,
> d varchar,
> e varchar
> ) with (
> 'connector' = 'values'
> ,'enable-projection-push-down' = 'true'
> insert into sink_table select a, b from newX
> insert into sink_table select a, 1 from newX
> {code}
> It will produce the plan as below, this will cause the source be consumed
> twice
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX,
> project=[a, b], metadata=[]]], fields=[a, b])
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Calc(select=[a, 1 AS b])
> +- TableSourceScan(table=[[default_catalog, default_database, newX,
> project=[a], metadata=[]]], fields=[a])
> {code}
> In this ticket, I propose to do a global optimization for the multi sink by
> * Megre the multi sink(with same table) into a single relNode tree with an
> extra union node
> * After optimization, split the merged union back to the original multi sink
> In my poc, after step 1, it will produce the plan as below, I think it will
> do good for the global performacne
> {code:java}
> Sink(table=[default_catalog.default_database.sink_table], fields=[a, b])
> +- Union(all=[true], union=[a, b])
> :- TableSourceScan(table=[[default_catalog, default_database, newX,
> project=[a, b], metadata=[]]], fields=[a, b])(reuse_id=[1])
> +- Calc(select=[a AS $f0, CAST(1 AS BIGINT) AS $f1])
> +- Reused(reference_id=[1])
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)