godfrey he created FLINK-5858:
---------------------------------

             Summary: Support multiple sinks in same execution DAG
                 Key: FLINK-5858
                 URL: https://issues.apache.org/jira/browse/FLINK-5858
             Project: Flink
          Issue Type: Improvement
          Components: Table API & SQL
            Reporter: godfrey he


When call writeToSink method to write the Table(with TableSource) to a 
TableSink, the Table was translated to DataSet or DataStream, if we call 
writeToSink(write to different sinks) more than once, the Table was also 
translated more than once. The final execution graph was parted to different 
DAGs. For example:

{code:title=Example.scala|borderStyle=solid}
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = new CsvTableSource(
      "/tmp/words",
      Array("first", "id", "score", "last"),
      Array(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO,
        BasicTypeInfo.DOUBLE_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
      ),
      fieldDelim = "#"
    )

    tEnv.registerTableSource("csv_source", csvTableSource)
    val resultTable = tEnv.scan("csv_source")
      .groupBy('first)
      .select('first, 'score.sum)

    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount1"))
    resultTable.writeToSink(new CsvTableSink("/tmp/wordcount2"))

    println(tEnv.explain(resultTable))
{code}

result:

== Abstract Syntax Tree ==
LogicalProject(first=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_0=[SUM($1)])
    LogicalProject(first=[$0], score=[$2])
      LogicalTableScan(table=[[csv_source]])

== Optimized Logical Plan ==
DataSetAggregate(groupBy=[first], select=[first, SUM(score) AS TMP_0])
  BatchTableSourceScan(table=[[csv_source]], fields=[first, score])

== Physical Execution Plan ==
{color:red}
Stage 6 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 5 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 4 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 3 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 2 : Map
                                        content : to: Row(f0: String, f1: 
Double)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Map
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 1 : Map
                                                content : Map at 
emitDataSet(CsvTableSink.scala:67)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : Map
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 0 : Data Sink
                                                        content : 
TextOutputFormat (/tmp/wordcount1) - UTF-8
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 13 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 12 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 11 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 10 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 9 : Map
                                        content : to: Row(f0: String, f1: 
Double)
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        driver_strategy : Map
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 8 : Map
                                                content : Map at 
emitDataSet(CsvTableSink.scala:67)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : Map
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 7 : Data Sink
                                                        content : 
TextOutputFormat (/tmp/wordcount2) - UTF-8
                                                        ship_strategy : Forward
                                                        exchange_mode : 
PIPELINED
                                                        Partitioning : 
RANDOM_PARTITIONED

{color:red}
Stage 18 : Data Source
{color}
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 17 : Map
                content : prepare select: (first, SUM(score) AS TMP_0)
                ship_strategy : Forward
                exchange_mode : PIPELINED
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 16 : GroupCombine
                        content : groupBy: (first), select: (first, SUM(score) 
AS TMP_0)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : Sorted Combine
                        Partitioning : RANDOM_PARTITIONED

                        Stage 15 : GroupReduce
                                content : groupBy: (first), select: (first, 
SUM(score) AS TMP_0)
                                ship_strategy : Hash Partition on [0]
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Group Reduce
                                Partitioning : RANDOM_PARTITIONED

                                Stage 14 : Data Sink
                                        content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
                                        ship_strategy : Forward
                                        exchange_mode : PIPELINED
                                        Partitioning : RANDOM_PARTITIONED



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to