[ 
https://issues.apache.org/jira/browse/HIVE-17087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16095886#comment-16095886
 ] 

liyunzhang_intel commented on HIVE-17087:
-----------------------------------------

[~stakiar]:  1 question about the patch
1. 
{noformat}
/* Two of the optimization rules, ConvertJoinMapJoin and 
RemoveDynamicPruningBySize, are put into
49           stats dependent optimizations and run together in TezCompiler. 
There's no guarantee which one
50           runs first, but in either case, the prior one may have removed a 
chain which the latter one is
51           not aware of. So we need to remember the leaf node(s) of that 
chain so it can be skipped.
52      
53           For example, as ConvertJoinMapJoin is removing the reduce sink, it 
may also have removed a
54           dynamic partition pruning operator chain. However, 
RemoveDynamicPruningBySize doesn't know this
55           and still tries to traverse that removed chain which will cause 
NPE.
56      
57           This may also happen when RemoveDynamicPruningBySize happens first.
58          */
59        public HashSet<SparkPartitionPruningSinkOperator> 
pruningOpsRemovedByPriorOpt;
{noformat}

[ConvertJoinMapJoin|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java#L155]
 is inserted to opRules first and then 
[RemoveDynamicPruningBySize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java#L165]
 and opRules is LinkedHashMap which is insertion-ordered, why say "There's no 
guarantee which one runs first"?


> Remove unnecessary HoS DPP trees during map-join conversion
> -----------------------------------------------------------
>
>                 Key: HIVE-17087
>                 URL: https://issues.apache.org/jira/browse/HIVE-17087
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Spark
>            Reporter: Sahil Takiar
>            Assignee: Sahil Takiar
>         Attachments: HIVE-17087.1.patch
>
>
> Ran the following query in the {{TestSparkCliDriver}}:
> {code:sql}
> set hive.spark.dynamic.partition.pruning=true;
> set hive.auto.convert.join=true;
> create table partitioned_table1 (col int) partitioned by (part_col int);
> create table partitioned_table2 (col int) partitioned by (part_col int);
> create table regular_table (col int);
> insert into table regular_table values (1);
> alter table partitioned_table1 add partition (part_col = 1);
> insert into table partitioned_table1 partition (part_col = 1) values (1), 
> (2), (3), (4), (5), (6), (7), (8), (9), (10);
> alter table partitioned_table2 add partition (part_col = 1);
> insert into table partitioned_table2 partition (part_col = 1) values (1), 
> (2), (3), (4), (5), (6), (7), (8), (9), (10);
> explain select * from partitioned_table1 where partitioned_table1.part_col in 
> (select regular_table.col from regular_table join partitioned_table2 on 
> regular_table.col = partitioned_table2.part_col);
> {code}
> and got the following explain plan:
> {code}
> STAGE DEPENDENCIES:
>   Stage-2 is a root stage
>   Stage-4 depends on stages: Stage-2
>   Stage-5 depends on stages: Stage-4
>   Stage-3 depends on stages: Stage-5
>   Stage-1 depends on stages: Stage-3
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-2
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 4 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table1
>                   Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                   Select Operator
>                     expressions: col (type: int), part_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                     Select Operator
>                       expressions: _col1 (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                         Spark Partition Pruning Sink Operator
>                           partition key expr: part_col
>                           Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                           target column name: part_col
>                           target work: Map 3
>   Stage: Stage-4
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 2 
>             Map Operator Tree:
>                 TableScan
>                   alias: regular_table
>                   Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE 
> Column stats: NONE
>                   Filter Operator
>                     predicate: col is not null (type: boolean)
>                     Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                     Select Operator
>                       expressions: col (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                       Spark HashTable Sink Operator
>                         keys:
>                           0 _col0 (type: int)
>                           1 _col0 (type: int)
>                       Select Operator
>                         expressions: _col0 (type: int)
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                         Group By Operator
>                           keys: _col0 (type: int)
>                           mode: hash
>                           outputColumnNames: _col0
>                           Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                           Spark Partition Pruning Sink Operator
>                             partition key expr: part_col
>                             Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                             target column name: part_col
>                             target work: Map 3
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-5
>     Spark
> #### A masked pattern was here ####
>   Stage: Stage-3
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 3 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table2
>                   Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                   Select Operator
>                     expressions: part_col (type: int)
>                     outputColumnNames: _col0
>                     Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                     Map Join Operator
>                       condition map:
>                            Inner Join 0 to 1
>                       keys:
>                         0 _col0 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0
>                       input vertices:
>                         0 Map 2
>                       Statistics: Num rows: 11 Data size: 12 Basic stats: 
> COMPLETE Column stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 11 Data size: 12 Basic stats: 
> COMPLETE Column stats: NONE
>                         Spark HashTable Sink Operator
>                           keys:
>                             0 _col1 (type: int)
>                             1 _col0 (type: int)
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-1
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table1
>                   Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                   Select Operator
>                     expressions: col (type: int), part_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 10 Data size: 11 Basic stats: 
> COMPLETE Column stats: NONE
>                     Map Join Operator
>                       condition map:
>                            Left Semi Join 0 to 1
>                       keys:
>                         0 _col1 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0, _col1
>                       input vertices:
>                         1 Map 3
>                       Statistics: Num rows: 12 Data size: 13 Basic stats: 
> COMPLETE Column stats: NONE
>                       File Output Operator
>                         compressed: false
>                         Statistics: Num rows: 12 Data size: 13 Basic stats: 
> COMPLETE Column stats: NONE
>                         table:
>                             input format: 
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                             output format: 
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                             serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink
> {code}
> I see a couple of weird things in the above explain plan:
> * I don't think there should be a partitioned_table1 scan -> Spark Partition 
> Pruning Sink
> * I'm not sure what is happening with Stage-5 of the explain plan
> For reference, here is the explain plan for the equivalent query in 
> Hive-on-Tez:
> {code}
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-1
>     Tez
> #### A masked pattern was here ####
>       Edges:
>         Map 1 <- Map 3 (BROADCAST_EDGE)
>         Map 3 <- Map 2 (BROADCAST_EDGE)
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table1
>                   Statistics: Num rows: 10 Data size: 51 Basic stats: 
> COMPLETE Column stats: PARTIAL
>                   Select Operator
>                     expressions: col (type: int), part_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 10 Data size: 40 Basic stats: 
> COMPLETE Column stats: PARTIAL
>                     Map Join Operator
>                       condition map:
>                            Left Semi Join 0 to 1
>                       keys:
>                         0 _col1 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0, _col1
>                       input vertices:
>                         1 Map 3
>                       Statistics: Num rows: 12 Data size: 48 Basic stats: 
> COMPLETE Column stats: NONE
>                       File Output Operator
>                         compressed: false
>                         Statistics: Num rows: 12 Data size: 48 Basic stats: 
> COMPLETE Column stats: NONE
>                         table:
>                             input format: 
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                             output format: 
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                             serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>             Execution mode: llap
>             LLAP IO: no inputs
>         Map 2 
>             Map Operator Tree:
>                 TableScan
>                   alias: regular_table
>                   Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE 
> Column stats: NONE
>                   Filter Operator
>                     predicate: col is not null (type: boolean)
>                     Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                     Select Operator
>                       expressions: col (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                       Reduce Output Operator
>                         key expressions: _col0 (type: int)
>                         sort order: +
>                         Map-reduce partition columns: _col0 (type: int)
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                       Select Operator
>                         expressions: _col0 (type: int)
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                         Group By Operator
>                           keys: _col0 (type: int)
>                           mode: hash
>                           outputColumnNames: _col0
>                           Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                           Dynamic Partitioning Event Operator
>                             Target column: part_col (int)
>                             Target Input: partitioned_table2
>                             Partition key expr: part_col
>                             Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                             Target Vertex: Map 3
>             Execution mode: llap
>             LLAP IO: no inputs
>         Map 3 
>             Map Operator Tree:
>                 TableScan
>                   alias: partitioned_table2
>                   Statistics: Num rows: 10 Data size: 51 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                   Select Operator
>                     expressions: part_col (type: int)
>                     outputColumnNames: _col0
>                     Statistics: Num rows: 10 Data size: 40 Basic stats: 
> COMPLETE Column stats: COMPLETE
>                     Map Join Operator
>                       condition map:
>                            Inner Join 0 to 1
>                       keys:
>                         0 _col0 (type: int)
>                         1 _col0 (type: int)
>                       outputColumnNames: _col0
>                       input vertices:
>                         0 Map 2
>                       Statistics: Num rows: 11 Data size: 44 Basic stats: 
> COMPLETE Column stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 11 Data size: 44 Basic stats: 
> COMPLETE Column stats: NONE
>                         Reduce Output Operator
>                           key expressions: _col0 (type: int)
>                           sort order: +
>                           Map-reduce partition columns: _col0 (type: int)
>                           Statistics: Num rows: 11 Data size: 44 Basic stats: 
> COMPLETE Column stats: NONE
>                         Select Operator
>                           expressions: _col0 (type: int)
>                           outputColumnNames: _col0
>                           Statistics: Num rows: 11 Data size: 44 Basic stats: 
> COMPLETE Column stats: NONE
>                           Group By Operator
>                             keys: _col0 (type: int)
>                             mode: hash
>                             outputColumnNames: _col0
>                             Statistics: Num rows: 11 Data size: 44 Basic 
> stats: COMPLETE Column stats: NONE
>                             Dynamic Partitioning Event Operator
>                               Target column: part_col (int)
>                               Target Input: partitioned_table1
>                               Partition key expr: part_col
>                               Statistics: Num rows: 11 Data size: 44 Basic 
> stats: COMPLETE Column stats: NONE
>                               Target Vertex: Map 1
>             Execution mode: llap
>             LLAP IO: no inputs
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to