[
https://issues.apache.org/jira/browse/FLINK-6955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055958#comment-16055958
]
Kaibo Zhou commented on FLINK-6955:
-----------------------------------
[~twalthr], thanks for your comments.
The result of {{TableEnvironment#explain}} is difficult to correspond to the
actual expression written by the user, especially for some complex jobs.
eg:
{code}
val stream = env.fromCollection(data)
val table = stream.toTable(tEnv, 'pk, 'a)
val leftTable = table
.select('pk as 'leftPk, 'a as 'leftA)
.where('leftA < 3)
val rightTable = table
.select('pk as 'rightPk, 'a as 'rightA)
.where('rightA > 3)
val rightTableWithPk = rightTable
.groupBy('rightPk)
.select('rightPk, 'rightA.max as 'rightA)
val resultTable = rightTableWithPk
.join(leftTable)
.where('leftPk === 'rightPk)
.groupBy('leftPk)
.select('leftPk, 'leftA.count)
resultTable.toDataSet[Row]
{code}
The explained result:
{code}
== Abstract Syntax Tree ==
LogicalProject(leftPk=[$0], TMP_1=[$1])
LogicalAggregate(group=[{0}], TMP_1=[COUNT($1)])
LogicalProject(leftPk=[$2], leftA=[$3])
LogicalFilter(condition=[=($2, $0)])
LogicalJoin(condition=[true], joinType=[inner])
LogicalProject(rightPk=[$0], rightA=[AS($1, 'rightA')])
LogicalAggregate(group=[{0}], TMP_0=[MAX($1)])
LogicalProject(rightPk=[$0], rightA=[$1])
LogicalFilter(condition=[>($1, 3)])
LogicalProject(rightPk=[AS($0, 'rightPk')], rightA=[AS($1,
'rightA')])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalFilter(condition=[<($1, 3)])
LogicalProject(leftPk=[AS($0, 'leftPk')], leftA=[AS($1, 'leftA')])
LogicalTableScan(table=[[_DataSetTable_0]])
== Optimized Logical Plan ==
DataSetCalc(select=[leftPk, TMP_1])
DataSetJoin(where=[=(leftPk, rightPk)], join=[rightPk, leftPk, TMP_1],
joinType=[InnerJoin])
DataSetCalc(select=[rightPk])
DataSetAggregate(groupBy=[rightPk], select=[rightPk, MAX(rightA) AS
TMP_0])
DataSetCalc(select=[pk AS rightPk, a AS rightA], where=[>(a, 3)])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(groupBy=[leftPk], select=[leftPk, COUNT(leftA) AS TMP_1])
DataSetCalc(select=[pk AS leftPk, a AS leftA], where=[<(a, 3)])
DataSetScan(table=[[_DataSetTable_0]])
== Physical Execution Plan ==
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 7 : Map
content : from: (pk, a)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 6 : FlatMap
content : where: (>(a, 3)), select: (pk AS rightPk, a
AS rightA)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 5 : GroupCombine
content : groupBy: (rightPk), select: (rightPk,
MAX(rightA) AS TMP_0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 4 : GroupReduce
content : groupBy: (rightPk), select:
(rightPk, MAX(rightA) AS TMP_0)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 3 : FlatMap
content : select: (rightPk)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning :
RANDOM_PARTITIONED
Stage 12 : Map
content : from: (pk, a)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning :
RANDOM_PARTITIONED
Stage 11 : FlatMap
content :
where: (<(a, 3)), select: (pk AS leftPk, a AS leftA)
ship_strategy :
Forward
exchange_mode :
PIPELINED
driver_strategy
: FlatMap
Partitioning :
RANDOM_PARTITIONED
Stage 10 :
GroupCombine
content
: groupBy: (leftPk), select: (leftPk, COUNT(leftA) AS TMP_1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 9
: GroupReduce
content : groupBy: (leftPk), select: (leftPk, COUNT(leftA) AS TMP_1)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group Reduce
Partitioning : RANDOM_PARTITIONED
Stage 2 : Join
content : where: (=(leftPk, rightPk)), join: (rightPk, leftPk, TMP_1)
ship_strategy : Hash Partition on [0]
exchange_mode : PIPELINED
driver_strategy : Hybrid Hash (build: select: (rightPk) (id: 3))
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
content : select: (leftPk, TMP_1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content :
org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
{code}
If the number of nodes reaches tens of hundreds, which is very common in the
production environment, it will takes a long time to read what the user writes.
If we can print operation log, the result may be:
{code}
UnnamedTable$2 = UnnamedTable$0.select('pk as 'leftPk, 'a as 'leftA)
.where('leftA < 3)
UnnamedTable$8 = UnnamedTable$0.select('pk as 'rightPk, 'a as 'rightA)
.where('rightA > 3)
.groupBy('rightPk)
.select('rightPk, max('rightA) as 'rightA)
.join(UnnamedTable$2)
.where('leftPk === 'rightPk)
.groupBy('leftPk)
.select('leftPk, count('leftA))
{code}
So that users can see at a glance to know what they wrote.
> Add operation log for Table
> ---------------------------
>
> Key: FLINK-6955
> URL: https://issues.apache.org/jira/browse/FLINK-6955
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Reporter: Kaibo Zhou
> Assignee: Kaibo Zhou
>
> In some of the actual production scenarios, the operation of the Table is
> very complicated, will go through a number of steps. For example, the Table
> object will be generated at the beginning of the program, in the process of
> running will be passed to different modules, each module will do some
> operations for the Table, such as union, join or filter. At the end of the
> program will call writeToSink or other operations.
> Hoping to record the operation about Table and can print out.
> eg:
> {code}
> val table1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv,
> 'a, 'b, 'c)
> val table2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a,
> 'b, 'd, 'c, 'e)
> val unionDs = table1.unionAll(table2.select('a, 'b, 'c)).filter('b <
> 2).select('c)
> val results = unionDs.toDataStream[Row]
>
> val result = tEnv.getLog
> val expected =
> "UnnamedTable$1 = UnnamedTable$0.select('a, 'b, 'c)\n" +
> "UnnamedTable$5 = UnnamedTable$2.unionAll(UnnamedTable$1)\n" +
> " .filter('b < 2)\n" +
> " .select('c)\n"
> assertEquals(expected, result)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)