[
https://issues.apache.org/jira/browse/FLINK-12424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he updated FLINK-12424:
-------------------------------
Description:
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If
there are more than one sink in a query, each sink-tree will be optimized
independent and the result execution plans are also completely independent.
Actually, there is a high probability of duplicate computing for a
multiple-sinks query. This issue aims to resolve the above problem. The basic
idea of the solution is as following:
1. lazy optimization: does not optimize the plan in {{writeToSink}} method,
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in
{{TableEnvironment}}, this method will trigger whole plan optimization and
execute the job.
The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to
be optimized once
e.g.
{code:scala}
val table = util.tableEnv.sqlQuery("select * from (select a as a1, b as b1 from
MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c is not
null) t2 where a1 = b2")
util.tableEnv.registerTable("TempTable", table)
val table1 = util.tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >=
70")
util.tableEnv.writeToSink(table1, Sink1)
val table2 = util.tableEnv.sqlQuery("select a1, c2 from TempTable where a1 <
70")
util.tableEnv.writeToSink(table2, Sink2)
{code}
!image-2019-05-07-13-33-02-793.png!
was:
Currently, Flink planner will optimize the plan in {{writeToSink}} method. If
there are more than one sink in a query, each sink-tree will be optimized
independent and the result execution plans are also completely independent.
Actually, there is a high probability of duplicate computing for a
multiple-sinks query. This issue aims to resolve the above problem. The basic
idea of the solution is as following:
1. lazy optimization: does not optimize the plan in {{writeToSink}} method,
just puts the plan into a collection.
2. whole plan optimization and execution: a new {{execute}} method is added in
{{TableEnvironment}}, this method will trigger whole plan optimization and
execute the job.
The basic idea of dag (multiple-sinks query) optimization:
1. decompose the dag into different block, the leaf block is the common sub-plan
2. optimize each block from leaf block to root block, each block only needs to
be optimized once
e.g.
!image-2019-05-07-13-33-02-793.png!
> Supports dag (multiple-sinks query) optimization
> ------------------------------------------------
>
> Key: FLINK-12424
> URL: https://issues.apache.org/jira/browse/FLINK-12424
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / Planner
> Reporter: godfrey he
> Assignee: godfrey he
> Priority: Major
> Attachments: image-2019-05-07-13-33-02-793.png
>
>
> Currently, Flink planner will optimize the plan in {{writeToSink}} method. If
> there are more than one sink in a query, each sink-tree will be optimized
> independent and the result execution plans are also completely independent.
> Actually, there is a high probability of duplicate computing for a
> multiple-sinks query. This issue aims to resolve the above problem. The basic
> idea of the solution is as following:
> 1. lazy optimization: does not optimize the plan in {{writeToSink}} method,
> just puts the plan into a collection.
> 2. whole plan optimization and execution: a new {{execute}} method is added
> in {{TableEnvironment}}, this method will trigger whole plan optimization and
> execute the job.
> The basic idea of dag (multiple-sinks query) optimization:
> 1. decompose the dag into different block, the leaf block is the common
> sub-plan
> 2. optimize each block from leaf block to root block, each block only needs
> to be optimized once
> e.g.
> {code:scala}
> val table = util.tableEnv.sqlQuery("select * from (select a as a1, b as b1
> from MyTable where a > 0) t1, (select b as b2, c as c2 from MyTable where c
> is not null) t2 where a1 = b2")
> util.tableEnv.registerTable("TempTable", table)
> val table1 = util.tableEnv.sqlQuery("select a1, b1 from TempTable where a1 >=
> 70")
> util.tableEnv.writeToSink(table1, Sink1)
> val table2 = util.tableEnv.sqlQuery("select a1, c2 from TempTable where a1 <
> 70")
> util.tableEnv.writeToSink(table2, Sink2)
> {code}
> !image-2019-05-07-13-33-02-793.png!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)