godfrey he created FLINK-5858: --------------------------------- Summary: Support multiple sinks in same execution DAG Key: FLINK-5858 URL: https://issues.apache.org/jira/browse/FLINK-5858 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: godfrey he
When call writeToSink method to write the Table(with TableSource) to a TableSink, the Table was translated to DataSet or DataStream, if we call writeToSink(write to different sinks) more than once, the Table was also translated more than once. The final execution graph was parted to different DAGs. For example: {code:title=Example.scala|borderStyle=solid} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val csvTableSource = new CsvTableSource( "/tmp/words", Array("first", "id", "score", "last"), Array( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ), fieldDelim = "#" ) tEnv.registerTableSource("csv_source", csvTableSource) val resultTable = tEnv.scan("csv_source") .groupBy('first) .select('first, 'score.sum) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1")) resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2")) println(tEnv.explain(resultTable)) {code} result: == Abstract Syntax Tree == LogicalProject(first=[$0], TMP_1=[$1]) LogicalAggregate(group=[{0}], TMP_0=[SUM($1)]) LogicalProject(first=[$0], score=[$2]) LogicalTableScan(table=[[csv_source]]) == Optimized Logical Plan == DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0]) BatchTableSourceScan(table=[[csv_source]], fields=[first, score]) == Physical Execution Plan == {color:red} Stage 6 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 5 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 4 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 3 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 2 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 1 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 0 : Data Sink content : TextOutputFormat (/tmp/wordcount1) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 13 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 12 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 11 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 10 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 9 : Map content : to: Row(f0: String, f1: Double) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 8 : Map content : Map at emitDataSet(CsvTableSink.scala:67) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 7 : Data Sink content : TextOutputFormat (/tmp/wordcount2) - UTF-8 ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED {color:red} Stage 18 : Data Source {color} content : collect elements with CollectionInputFormat Partitioning : RANDOM_PARTITIONED Stage 17 : Map content : prepare select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Map Partitioning : RANDOM_PARTITIONED Stage 16 : GroupCombine content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : Sorted Combine Partitioning : RANDOM_PARTITIONED Stage 15 : GroupReduce content : groupBy: (first), select: (first, SUM(score) AS TMP_0) ship_strategy : Hash Partition on [0] exchange_mode : PIPELINED driver_strategy : Sorted Group Reduce Partitioning : RANDOM_PARTITIONED Stage 14 : Data Sink content : org.apache.flink.api.java.io.DiscardingOutputFormat ship_strategy : Forward exchange_mode : PIPELINED Partitioning : RANDOM_PARTITIONED -- This message was sent by Atlassian JIRA (v6.3.15#6346)