[ 
https://issues.apache.org/jira/browse/HIVE-17018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076105#comment-16076105
 ] 

liyunzhang_intel commented on HIVE-17018:
-----------------------------------------

[~csun]: attached an example(HIVE-17018_data_init.q, HIVE-17018.q)
HIVE-17018
{code}
set hive.auto.convert.join.noconditionaltask.size=460;
explain select * from src,t1,t2,t3 where src.key=t1.key1 and src.value=t2.key2 
and t1.value1=t3.value3;
{code}

the explain 
{code}
 STAGE DEPENDENCIES:
  Stage-2 is a root stage
  Stage-3 depends on stages: Stage-2
  Stage-1 depends on stages: Stage-3
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-2
    Spark
      DagName: root_20170706020353_8bc59675-8374-4ec7-ab9c-4904cd5fcadb:2
      Vertices:
        Map 5 
            Map Operator Tree:
                TableScan
                  alias: t3
                  Statistics: Num rows: 2 Data size: 460 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: value3 is not null (type: boolean)
                    Statistics: Num rows: 2 Data size: 460 Basic stats: 
COMPLETE Column stats: NONE
                    Spark HashTable Sink Operator
                      keys:
                        0 _col6 (type: string)
                        1 value3 (type: string)
            Local Work:
              Map Reduce Local Work

  Stage: Stage-3
    Spark
      DagName: root_20170706020353_8bc59675-8374-4ec7-ab9c-4904cd5fcadb:3
      Vertices:
        Map 3 
            Map Operator Tree:
                TableScan
                  alias: t1
                  Statistics: Num rows: 2 Data size: 460 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: (key1 is not null and value1 is not null) (type: 
boolean)
                    Statistics: Num rows: 2 Data size: 460 Basic stats: 
COMPLETE Column stats: NONE
                    Spark HashTable Sink Operator
                      keys:
                        0 key (type: string)
                        1 key1 (type: string)
            Local Work:
              Map Reduce Local Work

  Stage: Stage-1
    Spark
      Edges:
        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 12), Map 4 (PARTITION-LEVEL 
SORT, 12)
      DagName: root_20170706020353_8bc59675-8374-4ec7-ab9c-4904cd5fcadb:1
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: src
                  Statistics: Num rows: 29 Data size: 5812 Basic stats: 
COMPLETE Column stats: NONE
                  Filter Operator
                    predicate: (key is not null and value is not null) (type: 
boolean)
                    Statistics: Num rows: 29 Data size: 5812 Basic stats: 
COMPLETE Column stats: NONE
                    Map Join Operator
                      condition map:
                           Inner Join 0 to 1
                      keys:
                        0 key (type: string)
                        1 key1 (type: string)
                      outputColumnNames: _col0, _col1, _col5, _col6
                      input vertices:
                        1 Map 3
                      Statistics: Num rows: 31 Data size: 6393 Basic stats: 
COMPLETE Column stats: NONE
                      Reduce Output Operator
                        key expressions: _col1 (type: string)
                        sort order: +
                        Map-reduce partition columns: _col1 (type: string)
                        Statistics: Num rows: 31 Data size: 6393 Basic stats: 
COMPLETE Column stats: NONE
                        value expressions: _col0 (type: string), _col5 (type: 
string), _col6 (type: string)
            Local Work:
              Map Reduce Local Work
        Map 4 
            Map Operator Tree:
                TableScan
                  alias: t2
                  Statistics: Num rows: 2 Data size: 460 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: key2 is not null (type: boolean)
                    Statistics: Num rows: 2 Data size: 460 Basic stats: 
COMPLETE Column stats: NONE
                    Reduce Output Operator
                      key expressions: key2 (type: string)
                      sort order: +
                      Map-reduce partition columns: key2 (type: string)
                      Statistics: Num rows: 2 Data size: 460 Basic stats: 
COMPLETE Column stats: NONE
                      value expressions: value2 (type: string)
        Reducer 2 
            Local Work:
              Map Reduce Local Work
            Reduce Operator Tree:
              Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 _col1 (type: string)
                  1 key2 (type: string)
                outputColumnNames: _col0, _col1, _col5, _col6, _col10, _col11
                Statistics: Num rows: 34 Data size: 7032 Basic stats: COMPLETE 
Column stats: NONE
                Map Join Operator
                  condition map:
                       Inner Join 0 to 1
                  keys:
                    0 _col6 (type: string)
                    1 value3 (type: string)
                  outputColumnNames: _col0, _col1, _col5, _col6, _col10, 
_col11, _col15, _col16
                  input vertices:
                    1 Map 5
                  Statistics: Num rows: 37 Data size: 7735 Basic stats: 
COMPLETE Column stats: NONE
                  Filter Operator
                    predicate: ((_col0 = _col5) and (_col1 = _col10) and (_col6 
= _col16)) (type: boolean)
                    Statistics: Num rows: 4 Data size: 836 Basic stats: 
COMPLETE Column stats: NONE
                    Select Operator
                      expressions: _col0 (type: string), _col1 (type: string), 
_col5 (type: string), _col6 (type: string), _col10 (type: string), _col11 
(type: string), _col15 (type: string), _col16 (type: string)
                      outputColumnNames: _col0, _col1, _col2, _col3, _col4, 
_col5, _col6, _col7
                      Statistics: Num rows: 4 Data size: 836 Basic stats: 
COMPLETE Column stats: NONE
                      File Output Operator
                        compressed: false
                        Statistics: Num rows: 4 Data size: 836 Basic stats: 
COMPLETE Column stats: NONE
                        table:
                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

{code}
There are 4 tables src(TS\[0\] size:6393), t1(TS\[1\] size:460),t2(TS\[2\] 
size:460),t3(TS\[3\] size:460)
the {{hive.auto.convert.join.noconditionaltask.size}} is 460. Only t3 can be 
converted to map join. But in above explain you can see that both t3 and t1 are 
converted to map join.
let's explain more detaily:
[maxSize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L188]:460
the logical plan is  
{code}
TS[0]-FIL[23]-RS[5]-JOIN[8]-RS[10]-JOIN[13]-RS[15]-JOIN[18]-FIL[22]-SEL[20]-FS[21]
TS[1]-FIL[24]-RS[7]-JOIN[8]
TS[2]-FIL[25]-RS[12]-JOIN[13]
TS[3]-FIL[26]-RS[17]-JOIN[18]
{code}

after TS\[3\] is converted to map join
->
{code}
TS[0]-FIL[23]-RS[5]-JOIN[8]-RS[10]-JOIN[13]-MAPJOIN[27]-FIL[22]-SEL[20]-FS[21]
TS[1]-FIL[24]-RS[7]-JOIN[8]
TS[2]-FIL[25]-RS[12]-JOIN[13]
TS[3]-FIL[26]-RS[17]-JOIN[18]
{code}

TS\[2\] can not converted to a map join because {{connectedMapJoinSize + 
sizeOf(TS\[2\]}} >maxSize, and TS\[1\] is converted to a map join
{code}
TS[0]-FIL[23]-MAPJOIN[28]-RS[10]-JOIN[13]-MAPJOIN[27]-FIL[22]-SEL[20]-FS[21]
TS[1]-FIL[24]-RS[7]-JOIN[8]
TS[2]-FIL[25]-RS[12]-JOIN[13]
TS[3]-FIL[26]-RS[17]-JOIN[18]
{code}


the reason why TS\[1\] can be converted to a map join(JOIN\[8\] can be 
converted to a map join):
 [SparkMapJoinOptimizer#getConnectedMapJoinSize| calculates all the mapjoins in 
the parent path and child path. 
But the search stops when encountering UnionOperator or ReduceOperator. For 
{{RS\[5\]}}, it stop searching at RS\[10\], so 
{{SparkMapJoinOptimizer#getConnectedMapJoinSize}}
is 0. {{connectedMapJoinSize + totalSize}} is 0+ sizeOf(TS\[1\])=460, 
{{connectedMapJoinSize + totalSize) < maxSize}} matches. 





> Small table is converted to map join even the total size of small tables 
> exceeds the threshold(hive.auto.convert.join.noconditionaltask.size)
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HIVE-17018
>                 URL: https://issues.apache.org/jira/browse/HIVE-17018
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>
>  we use "hive.auto.convert.join.noconditionaltask.size" as the threshold. it 
> means  the sum of size for n-1 of the tables/partitions for a n-way join is 
> smaller than it, it will be converted to a map join. for example, A join B 
> join C join D join E. Big table is A(100M), small tables are 
> B(10M),C(10M),D(10M),E(10M).  If we set 
> hive.auto.convert.join.noconditionaltask.size=20M. In current code, E,D,B 
> will be converted to map join but C will not be converted to map join. In my 
> understanding, because hive.auto.convert.join.noconditionaltask.size can only 
> contain E and D, so C and B should not be converted to map join.  
> Let's explain more why E can be converted to map join.
> in current code, 
> [SparkMapJoinOptimizer#getConnectedMapJoinSize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L364]
>  calculates all the mapjoins  in the parent path and child path. The search 
> stops when encountering [UnionOperator or 
> ReduceOperator|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L381].
>  Because C is not converted to map join because {{connectedMapJoinSize + 
> totalSize) > maxSize}} [see 
> code|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L330].The
>  RS before the join of C remains. When calculating whether B will be 
> converted to map join, {{getConnectedMapJoinSize}} returns 0 as encountering 
> [RS 
> |https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#409]
>  and causes  {{connectedMapJoinSize + totalSize) < maxSize}} matches.
> [~xuefuz] or [~jxiang]: can you help see whether this is a bug or not  as you 
> are more familiar with SparkJoinOptimizer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to