[
https://issues.apache.org/jira/browse/HIVE-17018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076105#comment-16076105
]
liyunzhang_intel commented on HIVE-17018:
-----------------------------------------
[~csun]: attached an example(HIVE-17018_data_init.q, HIVE-17018.q)
HIVE-17018
{code}
set hive.auto.convert.join.noconditionaltask.size=460;
explain select * from src,t1,t2,t3 where src.key=t1.key1 and src.value=t2.key2
and t1.value1=t3.value3;
{code}
the explain
{code}
STAGE DEPENDENCIES:
Stage-2 is a root stage
Stage-3 depends on stages: Stage-2
Stage-1 depends on stages: Stage-3
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-2
Spark
DagName: root_20170706020353_8bc59675-8374-4ec7-ab9c-4904cd5fcadb:2
Vertices:
Map 5
Map Operator Tree:
TableScan
alias: t3
Statistics: Num rows: 2 Data size: 460 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: value3 is not null (type: boolean)
Statistics: Num rows: 2 Data size: 460 Basic stats:
COMPLETE Column stats: NONE
Spark HashTable Sink Operator
keys:
0 _col6 (type: string)
1 value3 (type: string)
Local Work:
Map Reduce Local Work
Stage: Stage-3
Spark
DagName: root_20170706020353_8bc59675-8374-4ec7-ab9c-4904cd5fcadb:3
Vertices:
Map 3
Map Operator Tree:
TableScan
alias: t1
Statistics: Num rows: 2 Data size: 460 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: (key1 is not null and value1 is not null) (type:
boolean)
Statistics: Num rows: 2 Data size: 460 Basic stats:
COMPLETE Column stats: NONE
Spark HashTable Sink Operator
keys:
0 key (type: string)
1 key1 (type: string)
Local Work:
Map Reduce Local Work
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 12), Map 4 (PARTITION-LEVEL
SORT, 12)
DagName: root_20170706020353_8bc59675-8374-4ec7-ab9c-4904cd5fcadb:1
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: src
Statistics: Num rows: 29 Data size: 5812 Basic stats:
COMPLETE Column stats: NONE
Filter Operator
predicate: (key is not null and value is not null) (type:
boolean)
Statistics: Num rows: 29 Data size: 5812 Basic stats:
COMPLETE Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 key (type: string)
1 key1 (type: string)
outputColumnNames: _col0, _col1, _col5, _col6
input vertices:
1 Map 3
Statistics: Num rows: 31 Data size: 6393 Basic stats:
COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col1 (type: string)
sort order: +
Map-reduce partition columns: _col1 (type: string)
Statistics: Num rows: 31 Data size: 6393 Basic stats:
COMPLETE Column stats: NONE
value expressions: _col0 (type: string), _col5 (type:
string), _col6 (type: string)
Local Work:
Map Reduce Local Work
Map 4
Map Operator Tree:
TableScan
alias: t2
Statistics: Num rows: 2 Data size: 460 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: key2 is not null (type: boolean)
Statistics: Num rows: 2 Data size: 460 Basic stats:
COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: key2 (type: string)
sort order: +
Map-reduce partition columns: key2 (type: string)
Statistics: Num rows: 2 Data size: 460 Basic stats:
COMPLETE Column stats: NONE
value expressions: value2 (type: string)
Reducer 2
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col1 (type: string)
1 key2 (type: string)
outputColumnNames: _col0, _col1, _col5, _col6, _col10, _col11
Statistics: Num rows: 34 Data size: 7032 Basic stats: COMPLETE
Column stats: NONE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col6 (type: string)
1 value3 (type: string)
outputColumnNames: _col0, _col1, _col5, _col6, _col10,
_col11, _col15, _col16
input vertices:
1 Map 5
Statistics: Num rows: 37 Data size: 7735 Basic stats:
COMPLETE Column stats: NONE
Filter Operator
predicate: ((_col0 = _col5) and (_col1 = _col10) and (_col6
= _col16)) (type: boolean)
Statistics: Num rows: 4 Data size: 836 Basic stats:
COMPLETE Column stats: NONE
Select Operator
expressions: _col0 (type: string), _col1 (type: string),
_col5 (type: string), _col6 (type: string), _col10 (type: string), _col11
(type: string), _col15 (type: string), _col16 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4,
_col5, _col6, _col7
Statistics: Num rows: 4 Data size: 836 Basic stats:
COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 4 Data size: 836 Basic stats:
COMPLETE Column stats: NONE
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
{code}
There are 4 tables src(TS\[0\] size:6393), t1(TS\[1\] size:460),t2(TS\[2\]
size:460),t3(TS\[3\] size:460)
the {{hive.auto.convert.join.noconditionaltask.size}} is 460. Only t3 can be
converted to map join. But in above explain you can see that both t3 and t1 are
converted to map join.
let's explain more detaily:
[maxSize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L188]:460
the logical plan is
{code}
TS[0]-FIL[23]-RS[5]-JOIN[8]-RS[10]-JOIN[13]-RS[15]-JOIN[18]-FIL[22]-SEL[20]-FS[21]
TS[1]-FIL[24]-RS[7]-JOIN[8]
TS[2]-FIL[25]-RS[12]-JOIN[13]
TS[3]-FIL[26]-RS[17]-JOIN[18]
{code}
after TS\[3\] is converted to map join
->
{code}
TS[0]-FIL[23]-RS[5]-JOIN[8]-RS[10]-JOIN[13]-MAPJOIN[27]-FIL[22]-SEL[20]-FS[21]
TS[1]-FIL[24]-RS[7]-JOIN[8]
TS[2]-FIL[25]-RS[12]-JOIN[13]
TS[3]-FIL[26]-RS[17]-JOIN[18]
{code}
TS\[2\] can not converted to a map join because {{connectedMapJoinSize +
sizeOf(TS\[2\]}} >maxSize, and TS\[1\] is converted to a map join
{code}
TS[0]-FIL[23]-MAPJOIN[28]-RS[10]-JOIN[13]-MAPJOIN[27]-FIL[22]-SEL[20]-FS[21]
TS[1]-FIL[24]-RS[7]-JOIN[8]
TS[2]-FIL[25]-RS[12]-JOIN[13]
TS[3]-FIL[26]-RS[17]-JOIN[18]
{code}
the reason why TS\[1\] can be converted to a map join(JOIN\[8\] can be
converted to a map join):
[SparkMapJoinOptimizer#getConnectedMapJoinSize| calculates all the mapjoins in
the parent path and child path.
But the search stops when encountering UnionOperator or ReduceOperator. For
{{RS\[5\]}}, it stop searching at RS\[10\], so
{{SparkMapJoinOptimizer#getConnectedMapJoinSize}}
is 0. {{connectedMapJoinSize + totalSize}} is 0+ sizeOf(TS\[1\])=460,
{{connectedMapJoinSize + totalSize) < maxSize}} matches.
> Small table is converted to map join even the total size of small tables
> exceeds the threshold(hive.auto.convert.join.noconditionaltask.size)
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: HIVE-17018
> URL: https://issues.apache.org/jira/browse/HIVE-17018
> Project: Hive
> Issue Type: Bug
> Reporter: liyunzhang_intel
> Assignee: liyunzhang_intel
>
> we use "hive.auto.convert.join.noconditionaltask.size" as the threshold. it
> means the sum of size for n-1 of the tables/partitions for a n-way join is
> smaller than it, it will be converted to a map join. for example, A join B
> join C join D join E. Big table is A(100M), small tables are
> B(10M),C(10M),D(10M),E(10M). If we set
> hive.auto.convert.join.noconditionaltask.size=20M. In current code, E,D,B
> will be converted to map join but C will not be converted to map join. In my
> understanding, because hive.auto.convert.join.noconditionaltask.size can only
> contain E and D, so C and B should not be converted to map join.
> Let's explain more why E can be converted to map join.
> in current code,
> [SparkMapJoinOptimizer#getConnectedMapJoinSize|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L364]
> calculates all the mapjoins in the parent path and child path. The search
> stops when encountering [UnionOperator or
> ReduceOperator|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L381].
> Because C is not converted to map join because {{connectedMapJoinSize +
> totalSize) > maxSize}} [see
> code|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#L330].The
> RS before the join of C remains. When calculating whether B will be
> converted to map join, {{getConnectedMapJoinSize}} returns 0 as encountering
> [RS
> |https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java#409]
> and causes {{connectedMapJoinSize + totalSize) < maxSize}} matches.
> [~xuefuz] or [~jxiang]: can you help see whether this is a bug or not as you
> are more familiar with SparkJoinOptimizer.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)