[ 
https://issues.apache.org/jira/browse/FLINK-6955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055958#comment-16055958
 ] 

Kaibo Zhou commented on FLINK-6955:
-----------------------------------

[~twalthr], thanks for your comments.

The result of  {{TableEnvironment#explain}}  is difficult to correspond to the 
actual expression written by the user, especially for some complex jobs.

eg:

{code}
    val stream = env.fromCollection(data)
    val table = stream.toTable(tEnv, 'pk, 'a)

    val leftTable = table
      .select('pk as 'leftPk, 'a as 'leftA)
      .where('leftA < 3)
    val rightTable = table
      .select('pk as 'rightPk, 'a as 'rightA)
      .where('rightA > 3)
    val rightTableWithPk = rightTable
      .groupBy('rightPk)
      .select('rightPk, 'rightA.max as 'rightA)

    val resultTable = rightTableWithPk
      .join(leftTable)
      .where('leftPk === 'rightPk)
      .groupBy('leftPk)
      .select('leftPk, 'leftA.count)
    resultTable.toDataSet[Row]
{code}

The explained result:
{code}
== Abstract Syntax Tree ==
LogicalProject(leftPk=[$0], TMP_1=[$1])
  LogicalAggregate(group=[{0}], TMP_1=[COUNT($1)])
    LogicalProject(leftPk=[$2], leftA=[$3])
      LogicalFilter(condition=[=($2, $0)])
        LogicalJoin(condition=[true], joinType=[inner])
          LogicalProject(rightPk=[$0], rightA=[AS($1, 'rightA')])
            LogicalAggregate(group=[{0}], TMP_0=[MAX($1)])
              LogicalProject(rightPk=[$0], rightA=[$1])
                LogicalFilter(condition=[>($1, 3)])
                  LogicalProject(rightPk=[AS($0, 'rightPk')], rightA=[AS($1, 
'rightA')])
                    LogicalTableScan(table=[[_DataSetTable_0]])
          LogicalFilter(condition=[<($1, 3)])
            LogicalProject(leftPk=[AS($0, 'leftPk')], leftA=[AS($1, 'leftA')])
              LogicalTableScan(table=[[_DataSetTable_0]])

== Optimized Logical Plan ==
DataSetCalc(select=[leftPk, TMP_1])
  DataSetJoin(where=[=(leftPk, rightPk)], join=[rightPk, leftPk, TMP_1], 
joinType=[InnerJoin])
    DataSetCalc(select=[rightPk])
      DataSetAggregate(groupBy=[rightPk], select=[rightPk, MAX(rightA) AS 
TMP_0])
        DataSetCalc(select=[pk AS rightPk, a AS rightA], where=[>(a, 3)])
          DataSetScan(table=[[_DataSetTable_0]])
    DataSetAggregate(groupBy=[leftPk], select=[leftPk, COUNT(leftA) AS TMP_1])
      DataSetCalc(select=[pk AS leftPk, a AS leftA], where=[<(a, 3)])
        DataSetScan(table=[[_DataSetTable_0]])

== Physical Execution Plan ==
Stage 8 : Data Source
        content : collect elements with CollectionInputFormat
        Partitioning : RANDOM_PARTITIONED

        Stage 7 : Map
                content : from: (pk, a)
                ship_strategy : Forward
                exchange_mode : BATCH
                driver_strategy : Map
                Partitioning : RANDOM_PARTITIONED

                Stage 6 : FlatMap
                        content : where: (>(a, 3)), select: (pk AS rightPk, a 
AS rightA)
                        ship_strategy : Forward
                        exchange_mode : PIPELINED
                        driver_strategy : FlatMap
                        Partitioning : RANDOM_PARTITIONED

                        Stage 5 : GroupCombine
                                content : groupBy: (rightPk), select: (rightPk, 
MAX(rightA) AS TMP_0)
                                ship_strategy : Forward
                                exchange_mode : PIPELINED
                                driver_strategy : Sorted Combine
                                Partitioning : RANDOM_PARTITIONED

                                Stage 4 : GroupReduce
                                        content : groupBy: (rightPk), select: 
(rightPk, MAX(rightA) AS TMP_0)
                                        ship_strategy : Hash Partition on [0]
                                        exchange_mode : PIPELINED
                                        driver_strategy : Sorted Group Reduce
                                        Partitioning : RANDOM_PARTITIONED

                                        Stage 3 : FlatMap
                                                content : select: (rightPk)
                                                ship_strategy : Forward
                                                exchange_mode : PIPELINED
                                                driver_strategy : FlatMap
                                                Partitioning : 
RANDOM_PARTITIONED

                                                Stage 12 : Map
                                                        content : from: (pk, a)
                                                        ship_strategy : Forward
                                                        exchange_mode : BATCH
                                                        driver_strategy : Map
                                                        Partitioning : 
RANDOM_PARTITIONED

                                                        Stage 11 : FlatMap
                                                                content : 
where: (<(a, 3)), select: (pk AS leftPk, a AS leftA)
                                                                ship_strategy : 
Forward
                                                                exchange_mode : 
PIPELINED
                                                                driver_strategy 
: FlatMap
                                                                Partitioning : 
RANDOM_PARTITIONED

                                                                Stage 10 : 
GroupCombine
                                                                        content 
: groupBy: (leftPk), select: (leftPk, COUNT(leftA) AS TMP_1)
                                                                        
ship_strategy : Forward
                                                                        
exchange_mode : PIPELINED
                                                                        
driver_strategy : Sorted Combine
                                                                        
Partitioning : RANDOM_PARTITIONED

                                                                        Stage 9 
: GroupReduce
                                                                                
content : groupBy: (leftPk), select: (leftPk, COUNT(leftA) AS TMP_1)
                                                                                
ship_strategy : Hash Partition on [0]
                                                                                
exchange_mode : PIPELINED
                                                                                
driver_strategy : Sorted Group Reduce
                                                                                
Partitioning : RANDOM_PARTITIONED

                                                                                
Stage 2 : Join
                                                                                
        content : where: (=(leftPk, rightPk)), join: (rightPk, leftPk, TMP_1)
                                                                                
        ship_strategy : Hash Partition on [0]
                                                                                
        exchange_mode : PIPELINED
                                                                                
        driver_strategy : Hybrid Hash (build: select: (rightPk) (id: 3))
                                                                                
        Partitioning : RANDOM_PARTITIONED

                                                                                
        Stage 1 : FlatMap
                                                                                
                content : select: (leftPk, TMP_1)
                                                                                
                ship_strategy : Forward
                                                                                
                exchange_mode : PIPELINED
                                                                                
                driver_strategy : FlatMap
                                                                                
                Partitioning : RANDOM_PARTITIONED

                                                                                
                Stage 0 : Data Sink
                                                                                
                        content : 
org.apache.flink.api.java.io.DiscardingOutputFormat
                                                                                
                        ship_strategy : Forward
                                                                                
                        exchange_mode : PIPELINED
                                                                                
                        Partitioning : RANDOM_PARTITIONED
{code}

If the number of nodes reaches tens of hundreds, which is very common in the 
production environment, it will takes a long time to read what the user writes.

If we can print operation log, the result may be:

{code}
UnnamedTable$2 = UnnamedTable$0.select('pk as 'leftPk, 'a as 'leftA)
  .where('leftA < 3)
UnnamedTable$8 = UnnamedTable$0.select('pk as 'rightPk, 'a as 'rightA)
  .where('rightA > 3)
  .groupBy('rightPk)
  .select('rightPk, max('rightA) as 'rightA)
  .join(UnnamedTable$2)
  .where('leftPk === 'rightPk)
  .groupBy('leftPk)
  .select('leftPk, count('leftA))
{code}

So that users can see at a glance to know what they wrote.

> Add operation log for Table
> ---------------------------
>
>                 Key: FLINK-6955
>                 URL: https://issues.apache.org/jira/browse/FLINK-6955
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Kaibo Zhou
>            Assignee: Kaibo Zhou
>
> In some of the actual production scenarios, the operation of the Table is 
> very complicated, will go through a number of steps. For example, the Table 
> object will be generated at the beginning of the program, in the process of 
> running will be passed to different modules, each module will do some 
> operations for the Table, such as union, join or filter. At the end of the 
> program will call writeToSink or other operations.
> Hoping to record the operation about Table and can print out.
> eg:
> {code}
>     val table1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 
> 'a, 'b, 'c)
>     val table2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 
> 'b, 'd, 'c, 'e)
>     val unionDs = table1.unionAll(table2.select('a, 'b, 'c)).filter('b < 
> 2).select('c)
>     val results = unionDs.toDataStream[Row]
>     
>     val result = tEnv.getLog
>     val expected =
>       "UnnamedTable$1 = UnnamedTable$0.select('a, 'b, 'c)\n" +
>         "UnnamedTable$5 = UnnamedTable$2.unionAll(UnnamedTable$1)\n" +
>         "  .filter('b < 2)\n" +
>         "  .select('c)\n"
>     assertEquals(expected, result)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to