[ 
https://issues.apache.org/jira/browse/FLINK-5858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-5858:
------------------------------
    Description: 
When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = new CsvTableSource(
      "/tmp/words",
      Array("first", "id", "score", "last"),
      Array(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
      ),
      fieldDelim = "#"
    )

    tEnv.registerTableSource("csv_source", csvTableSource)
    val resultTable = tEnv.scan("csv_source")
      .groupBy('first)
      .select('first, 'score.sum)

    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

    println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
    LogicalProject(first=[$0], score=[$2])
      LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 5 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 4 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 3 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 2 : Map
                                        content : to: Row(f0: String, f1: 
Double)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Map
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 1 : Map
                                                content : Map at 
emitDataSet(CsvTableSink.scala:67)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : Map
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 0 : Data Sink
                                                        content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 12 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 11 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 10 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 9 : Map
                                        content : to: Row(f0: String, f1: 
Double)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Map
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 8 : Map
                                                content : Map at 
emitDataSet(CsvTableSink.scala:67)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : Map
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 7 : Data Sink
                                                        content : 
TextOutputFormat (/tmp/wordcount2) - UTF-8
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 18 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 17 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 16 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 15 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 14 : Data Sink
                                        content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        Partitioning : RANDOM_PARTITIONED


  was:
When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = new CsvTableSource(
      "/tmp/words",
      Array("first", "id", "score", "last"),
      Array(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
      ),
      fieldDelim = "#"
    )

    tEnv.registerTableSource("csv_source", csvTableSource)
    val resultTable = tEnv.scan("csv_source")
      .groupBy('first)
      .select('first, 'score.sum)

    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

    println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
    LogicalProject(first=[$0], score=[$2])
      LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 5 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 4 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 3 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 2 : Map
                                        content : to: Row(f0: String, f1: 
Double)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Map
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 1 : Map
                                                content : Map at 
emitDataSet(CsvTableSink.scala:67)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : Map
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 0 : Data Sink
                                                        content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 12 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 11 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 10 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 9 : Map
                                        content : to: Row(f0: String, f1: 
Double)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Map
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 8 : Map
                                                content : Map at 
emitDataSet(CsvTableSink.scala:67)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : Map
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 7 : Data Sink
                                                        content : 
TextOutputFormat (/tmp/wordcount2) - UTF-8
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 18 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 17 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 16 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 15 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 14 : Data Sink
                                        content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        Partitioning : RANDOM_PARTITIONED


> Support multiple sinks in same execution DAG
> --------------------------------------------
>
>                 Key: FLINK-5858
>                 URL: https://issues.apache.org/jira/browse/FLINK-5858
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: godfrey he
>
> When call writeToSink method to write the Table(with TableSource) to a 
> TableSink, the Table was translated to DataSet or DataStream, if we call 
> writeToSink(write to different sinks) more than once, the Table was also 
> translated more than once. The final execution graph was parted to different 
> DAGs. For example:
> {code:title=Example.scala|borderStyle=solid}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val csvTableSource = new CsvTableSource(
>       "/tmp/words",
>       Array("first", "id", "score", "last"),
>       Array(
>         BasicTypeInfo.STRING_TYPE_INFO,
>         BasicTypeInfo.INT_TYPE_INFO,
>         BasicTypeInfo.DOUBLE_TYPE_INFO,
>         BasicTypeInfo.STRING_TYPE_INFO
>       ),
>       fieldDelim = "#"
>     )
>     tEnv.registerTableSource("csv_source", csvTableSource)
>     val resultTable = tEnv.scan("csv_source")
>       .groupBy('first)
>       .select('first, 'score.sum)
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
>     resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))
>     println(tEnv.explain(resultTable))
> {code}
> result:
> == Abstract Syntax Tree ==
> LogicalProject(first=[$0], TMP_1=[$1])
>   LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
>     LogicalProject(first=[$0], score=[$2])
>       LogicalTableScan(table=[[csv_source]])
> == Optimized Logical Plan ==
> DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
>   BatchTableSourceScan(table=[[csv_source]], fields=[first, score])
> == Physical Execution Plan ==
> {color:red}
> Stage 6 : Data Source
> {color}
>       content : collect elements with CollectionInputFormat
>       Partitioning : RANDOM_PARTITIONED
>       Stage 5 : Map
>               content : prepare select: (first, SUM(score) AS TMP_0)
>               ship_strategy : Forward
>               exchange_mode : PIPELINED
>               driver_strategy : Map
>               Partitioning : RANDOM_PARTITIONED
>               Stage 4 : GroupCombine
>                       content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>                       ship_strategy : Forward
>                       exchange_mode : PIPELINED
>                       driver_strategy : Sorted Combine
>                       Partitioning : RANDOM_PARTITIONED
>                       Stage 3 : GroupReduce
>                               content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>                               ship_strategy : Hash Partition on [0]
>                               exchange_mode : PIPELINED
>                               driver_strategy : Sorted Group Reduce
>                               Partitioning : RANDOM_PARTITIONED
>                               Stage 2 : Map
>                                       content : to: Row(f0: String, f1: 
> Double)
>                                       ship_strategy : Forward
>                                       exchange_mode : PIPELINED
>                                       driver_strategy : Map
>                                       Partitioning : RANDOM_PARTITIONED
>                                       Stage 1 : Map
>                                               content : Map at 
> emitDataSet(CsvTableSink.scala:67)
>                                               ship_strategy : Forward
>                                               exchange_mode : PIPELINED
>                                               driver_strategy : Map
>                                               Partitioning : 
> RANDOM_PARTITIONED
>                                               Stage 0 : Data Sink
>                                                       content : 
> TextOutputFormat (/tmp/wordcount1) - UTF-8
>                                                       ship_strategy : Forward
>                                                       exchange_mode : 
> PIPELINED
>                                                       Partitioning : 
> RANDOM_PARTITIONED
> {color:red}
> Stage 13 : Data Source
> {color}
>       content : collect elements with CollectionInputFormat
>       Partitioning : RANDOM_PARTITIONED
>       Stage 12 : Map
>               content : prepare select: (first, SUM(score) AS TMP_0)
>               ship_strategy : Forward
>               exchange_mode : PIPELINED
>               driver_strategy : Map
>               Partitioning : RANDOM_PARTITIONED
>               Stage 11 : GroupCombine
>                       content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>                       ship_strategy : Forward
>                       exchange_mode : PIPELINED
>                       driver_strategy : Sorted Combine
>                       Partitioning : RANDOM_PARTITIONED
>                       Stage 10 : GroupReduce
>                               content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>                               ship_strategy : Hash Partition on [0]
>                               exchange_mode : PIPELINED
>                               driver_strategy : Sorted Group Reduce
>                               Partitioning : RANDOM_PARTITIONED
>                               Stage 9 : Map
>                                       content : to: Row(f0: String, f1: 
> Double)
>                                       ship_strategy : Forward
>                                       exchange_mode : PIPELINED
>                                       driver_strategy : Map
>                                       Partitioning : RANDOM_PARTITIONED
>                                       Stage 8 : Map
>                                               content : Map at 
> emitDataSet(CsvTableSink.scala:67)
>                                               ship_strategy : Forward
>                                               exchange_mode : PIPELINED
>                                               driver_strategy : Map
>                                               Partitioning : 
> RANDOM_PARTITIONED
>                                               Stage 7 : Data Sink
>                                                       content : 
> TextOutputFormat (/tmp/wordcount2) - UTF-8
>                                                       ship_strategy : Forward
>                                                       exchange_mode : 
> PIPELINED
>                                                       Partitioning : 
> RANDOM_PARTITIONED
> {color:red}
> Stage 18 : Data Source
> {color}
>       content : collect elements with CollectionInputFormat
>       Partitioning : RANDOM_PARTITIONED
>       Stage 17 : Map
>               content : prepare select: (first, SUM(score) AS TMP_0)
>               ship_strategy : Forward
>               exchange_mode : PIPELINED
>               driver_strategy : Map
>               Partitioning : RANDOM_PARTITIONED
>               Stage 16 : GroupCombine
>                       content : groupBy: (first), select: (first, SUM(score) 
> AS TMP_0)
>                       ship_strategy : Forward
>                       exchange_mode : PIPELINED
>                       driver_strategy : Sorted Combine
>                       Partitioning : RANDOM_PARTITIONED
>                       Stage 15 : GroupReduce
>                               content : groupBy: (first), select: (first, 
> SUM(score) AS TMP_0)
>                               ship_strategy : Hash Partition on [0]
>                               exchange_mode : PIPELINED
>                               driver_strategy : Sorted Group Reduce
>                               Partitioning : RANDOM_PARTITIONED
>                               Stage 14 : Data Sink
>                                       content : 
> org.apache.flink.api.java.io.DiscardingOutputFormat
>                                       ship_strategy : Forward
>                                       exchange_mode : PIPELINED
>                                       Partitioning : RANDOM_PARTITIONED



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to