Hi all,
I think I found a bug, I'm not sure whether the problem is at optimizer
(PPD) or at map side aggregate.
See query listed below:
-------------------------------------
create table if not exists dm_fact_buyer_prd_info_d (
category_id string
,gmv_trade_num int
,user_id int
)
PARTITIONED BY (ds int);
set hive.optimize.ppd=true;
set hive.map.aggr=true;
explain select 20100426, category_id1,category_id2,assoc_idx
from (
select
category_id1
, category_id2
, count(distinct user_id) as assoc_idx
from (
select
t1.category_id as category_id1
, t2.category_id as category_id2
, t1.user_id
from (
select category_id, user_id
from dm_fact_buyer_prd_info_d
where ds <= 20100426
and ds > 20100419
and category_id >0
and gmv_trade_num>0
group by category_id, user_id ) t1
join (
select category_id, user_id
from dm_fact_buyer_prd_info_d
where ds <= 20100426
and ds >20100419
and category_id >0
and gmv_trade_num >0
group by category_id, user_id ) t2 on t1.user_id=t2.user_id
) t1
group by category_id1, category_id2 ) t_o
where category_id1 <> category_id2
and assoc_idx > 2;
--------------------------------
The query above will fail when execute, throwing exception: "can not cast
UDFOpNotEqual(Text, IntWritable) to UDFOpNotEqual(Text, Text)".
I explained the query and the execute plan looks really wired (see
the highlighted predicate):
--------------------------------
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_SUBQUERY
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_SUBQUERY (TOK_QUERY (TOK_FROM
(TOK_TABREF dm_fact_buyer_prd_info_d)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL category_id))
(TOK_SELEXPR (TOK_TABLE_OR_COL user_id))) (TOK_WHERE (and (and (and (<=
(TOK_TABLE_OR_COL ds) 20100426) (> (TOK_TABLE_OR_COL ds) 20100419)) (>
(TOK_TABLE_OR_COL category_id) 0)) (> (TOK_TABLE_OR_COL gmv_trade_num) 0)))
(TOK_GROUPBY (TOK_TABLE_OR_COL category_id) (TOK_TABLE_OR_COL user_id))))
t1) (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF
dm_fact_buyer_prd_info_d)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR
TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL category_id))
(TOK_SELEXPR (TOK_TABLE_OR_COL user_id))) (TOK_WHERE (and (and (and (<=
(TOK_TABLE_OR_COL ds) 20100426) (> (TOK_TABLE_OR_COL ds) 20100419)) (>
(TOK_TABLE_OR_COL category_id) 0)) (> (TOK_TABLE_OR_COL gmv_trade_num) 0)))
(TOK_GROUPBY (TOK_TABLE_OR_COL category_id) (TOK_TABLE_OR_COL user_id))))
t2) (= (. (TOK_TABLE_OR_COL t1) user_id) (. (TOK_TABLE_OR_COL t2)
user_id)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT
(TOK_SELEXPR (. (TOK_TABLE_OR_COL t1) category_id) category_id1)
(TOK_SELEXPR (. (TOK_TABLE_OR_COL t2) category_id) category_id2)
(TOK_SELEXPR (. (TOK_TABLE_OR_COL t1) user_id))))) t1)) (TOK_INSERT
(TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR
(TOK_TABLE_OR_COL category_id1)) (TOK_SELEXPR (TOK_TABLE_OR_COL
category_id2)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL
user_id)) assoc_idx)) (TOK_GROUPBY (TOK_TABLE_OR_COL category_id1)
(TOK_TABLE_OR_COL category_id2)))) t_o)) (TOK_INSERT (TOK_DESTINATION
(TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR 20100426) (TOK_SELEXPR
(TOK_TABLE_OR_COL category_id1)) (TOK_SELEXPR (TOK_TABLE_OR_COL
category_id2)) (TOK_SELEXPR (TOK_TABLE_OR_COL assoc_idx))) (TOK_WHERE (and
(<> (TOK_TABLE_OR_COL category_id1) (TOK_TABLE_OR_COL category_id2)) (>
(TOK_TABLE_OR_COL assoc_idx) 2)))))
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-2 depends on stages: Stage-1, Stage-4
Stage-3 depends on stages: Stage-2
Stage-4 is a root stage
Stage-2 depends on stages: Stage-1, Stage-4
Stage-3 depends on stages: Stage-2
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:
t_o:t1:t1:dm_fact_buyer_prd_info_d
TableScan
alias: dm_fact_buyer_prd_info_d
Filter Operator
predicate:
expr: (((((UDFToDouble(ds) <= UDFToDouble(20100426)) and
(UDFToDouble(ds) > UDFToDouble(20100419))) and (UDFToDouble(category_id) >
UDFToDouble(0))) and (gmv_trade_num > 0)) and (category_id <> user_id))
type: boolean
Filter Operator
predicate:
expr: ((((UDFToDouble(ds) <= UDFToDouble(20100426)) and
(UDFToDouble(ds) > UDFToDouble(20100419))) and (UDFToDouble(category_id) >
UDFToDouble(0))) and (gmv_trade_num > 0))
type: boolean
Select Operator
expressions:
expr: category_id
type: string
expr: user_id
type: int
outputColumnNames: category_id, user_id
Group By Operator
keys:
expr: category_id
type: string
expr: user_id
type: int
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: int
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: int
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: int
outputColumnNames: _col0, _col1
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-2
Map Reduce
Alias -> Map Operator Tree:
$INTNAME
Reduce Output Operator
key expressions:
expr: _col1
type: int
sort order: +
Map-reduce partition columns:
expr: _col1
type: int
tag: 0
value expressions:
expr: _col0
type: string
expr: _col1
type: int
$INTNAME1
Reduce Output Operator
key expressions:
expr: _col1
type: int
sort order: +
Map-reduce partition columns:
expr: _col1
type: int
tag: 1
value expressions:
expr: _col0
type: string
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col1}
1 {VALUE._col0}
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: string
expr: _col2
type: string
expr: _col1
type: int
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: int
outputColumnNames: _col0, _col1, _col2
Group By Operator
aggregations:
expr: count(DISTINCT _col2)
keys:
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: int
mode: hash
outputColumnNames: _col0, _col1, _col2, _col3
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-3
Map Reduce
Alias -> Map Operator Tree:
file:/group/tbdev/shaojie/scratch/420686432\10003
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: int
sort order: +++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: string
tag: -1
value expressions:
expr: _col3
type: bigint
Reduce Operator Tree:
Group By Operator
aggregations:
expr: count(DISTINCT KEY._col2)
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: string
mode: mergepartial
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2
Filter Operator
predicate:
expr: ((_col0 <> _col1) and (UDFToDouble(_col2) >
UDFToDouble(2)))
type: boolean
Select Operator
expressions:
expr: 20100426
type: int
expr: _col0
type: string
expr: _col1
type: string
expr: _col2
type: bigint
outputColumnNames: _col0, _col1, _col2, _col3
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-4
Map Reduce
Alias -> Map Operator Tree:
t_o:t1:t2:dm_fact_buyer_prd_info_d
TableScan
alias: dm_fact_buyer_prd_info_d
Filter Operator
predicate:
expr: ((((UDFToDouble(ds) <= UDFToDouble(20100426)) and
(UDFToDouble(ds) > UDFToDouble(20100419))) and (UDFToDouble(category_id) >
UDFToDouble(0))) and (gmv_trade_num > 0))
type: boolean
Filter Operator
predicate:
expr: ((((UDFToDouble(ds) <= UDFToDouble(20100426)) and
(UDFToDouble(ds) > UDFToDouble(20100419))) and (UDFToDouble(category_id) >
UDFToDouble(0))) and (gmv_trade_num > 0))
type: boolean
Select Operator
expressions:
expr: category_id
type: string
expr: user_id
type: int
outputColumnNames: category_id, user_id
Group By Operator
keys:
expr: category_id
type: string
expr: user_id
type: int
mode: hash
outputColumnNames: _col0, _col1
Reduce Output Operator
key expressions:
expr: _col0
type: string
expr: _col1
type: int
sort order: ++
Map-reduce partition columns:
expr: _col0
type: string
expr: _col1
type: int
tag: -1
Reduce Operator Tree:
Group By Operator
keys:
expr: KEY._col0
type: string
expr: KEY._col1
type: int
mode: mergepartial
outputColumnNames: _col0, _col1
Select Operator
expressions:
expr: _col0
type: string
expr: _col1
type: int
outputColumnNames: _col0, _col1
File Output Operator
compressed: true
GlobalTableId: 0
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
--------------------------------
Well, I tried disabling predicate push down (set hive.optimize.ppd=true),
the error is gone; I tried disabling map side aggregate, the error is
gone,too.
Anybody knows what the problem is? Please give me some advice.
--
Best Regards,
Ted Xu