[
https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he reassigned FLINK-5858:
---------------------------------
Assignee: godfrey he
> Support multiple sinks in same execution DAG
> --------------------------------------------
>
> Key: FLINK-5858
> URL: https://issues.apache.org/jira/browse/FLINK-5858
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: godfrey he
> Assignee: godfrey he
>
> When call writeToSink method to write the Table(with TableSource) to a
> TableSink, the Table was translated to DataSet or DataStream, if we call
> writeToSink(write to different sinks) more than once, the Table was also
> translated more than once. The final execution graph was parted to different
> DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val csvTableSource = new CsvTableSource(
> "/tmp/words",
> Array("first", "id", "score", "last"),
> Array(
> BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.DOUBLE_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO
> ),
> fieldDelim = "#"
> )
> tEnv.registerTableSource("csv_source", csvTableSource)
> val resultTable = tEnv.scan("csv_source")
> .groupBy('first)
> .select('first, 'score.sum)
> resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
> resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
> println(tEnv.explain(resultTable))
> {code}
> Results:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
> LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
> LogicalProject(first=[$0], score=[$2])
> LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
> BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
> content : collect elements with CollectionInputFormat
> Partitioning : RANDOM_PARTITIONED
> Stage 5 : Map
> content : prepare select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 4 : GroupCombine
> content : groupBy: (first), select: (first, SUM(score)
> AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Sorted Combine
> Partitioning : RANDOM_PARTITIONED
> Stage 3 : GroupReduce
> content : groupBy: (first), select: (first,
> SUM(score) AS TMP_0)
> ship_strategy : Hash Partition on [0]
> exchange_mode : PIPELINED
> driver_strategy : Sorted Group Reduce
> Partitioning : RANDOM_PARTITIONED
> Stage 2 : Map
> content : to: Row(f0: String, f1:
> Double)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 1 : Map
> content : Map at
> emitDataSet(CsvTableSink.scala:67)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning :
> RANDOM_PARTITIONED
> Stage 0 : Data Sink
> content :
> TextOutputFormat (/tmp/wordcount1) - UTF-8
> ship_strategy : Forward
> exchange_mode :
> PIPELINED
> Partitioning :
> RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
> content : collect elements with CollectionInputFormat
> Partitioning : RANDOM_PARTITIONED
> Stage 12 : Map
> content : prepare select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 11 : GroupCombine
> content : groupBy: (first), select: (first, SUM(score)
> AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Sorted Combine
> Partitioning : RANDOM_PARTITIONED
> Stage 10 : GroupReduce
> content : groupBy: (first), select: (first,
> SUM(score) AS TMP_0)
> ship_strategy : Hash Partition on [0]
> exchange_mode : PIPELINED
> driver_strategy : Sorted Group Reduce
> Partitioning : RANDOM_PARTITIONED
> Stage 9 : Map
> content : to: Row(f0: String, f1:
> Double)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 8 : Map
> content : Map at
> emitDataSet(CsvTableSink.scala:67)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning :
> RANDOM_PARTITIONED
> Stage 7 : Data Sink
> content :
> TextOutputFormat (/tmp/wordcount2) - UTF-8
> ship_strategy : Forward
> exchange_mode :
> PIPELINED
> Partitioning :
> RANDOM_PARTITIONED
> {color:red}
> Stage 18 : Data Source
> {color}
> content : collect elements with CollectionInputFormat
> Partitioning : RANDOM_PARTITIONED
> Stage 17 : Map
> content : prepare select: (first, SUM(score) AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Map
> Partitioning : RANDOM_PARTITIONED
> Stage 16 : GroupCombine
> content : groupBy: (first), select: (first, SUM(score)
> AS TMP_0)
> ship_strategy : Forward
> exchange_mode : PIPELINED
> driver_strategy : Sorted Combine
> Partitioning : RANDOM_PARTITIONED
> Stage 15 : GroupReduce
> content : groupBy: (first), select: (first,
> SUM(score) AS TMP_0)
> ship_strategy : Hash Partition on [0]
> exchange_mode : PIPELINED
> driver_strategy : Sorted Group Reduce
> Partitioning : RANDOM_PARTITIONED
> Stage 14 : Data Sink
> content :
> org.apache.flink.api.java.io.DiscardingOutputFormat
> ship_strategy : Forward
> exchange_mode : PIPELINED
> Partitioning : RANDOM_PARTITIONED
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)