[
https://issues.apache.org/jira/browse/HIVE-18340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372473#comment-16372473
]
Ke Jia commented on HIVE-18340:
-------------------------------
[~stakiar]:
This optimization has following effect:
{code:java}
set hive.optimize.index.filter=true;
set hive.auto.convert.join=false;
create table pokes(foo int);
create table poke1(foo1 int, fil string);
insert into table pokes values(1);
insert into table poke1 values(1, "123");
explain select count(*) from pokes join poke1 on (pokes.foo = poke1.foo1)
where poke1.fil=123;
{code}
When enable RF "set hive.spark.dynamic.runtimefilter.pruning=true;", the
explain shows:
{code:java}
STAGE DEPENDENCIES:
Stage-2 is a root stage
Stage-1 depends on stages: Stage-2
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-2
Spark
Edges:
Reducer 6 <- Map 5 (GROUP, 1)
DagName: root_20180222135336_d8f32495-a93d-4c59-8b56-7a9d78304a41:4
Vertices:
Map 5
Map Operator Tree:
TableScan
alias: pokes
filterExpr: foo is not null (type: boolean)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: foo is not null (type: boolean)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Select Operator
expressions: foo (type: int)
outputColumnNames: _col0
Statistics: Num rows: 3 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
Group By Operator
aggregations: min(_col0), max(_col0),
bloom_filter(_col0, expectedEntries=3)
mode: hash
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 12 Basic stats:
COMPLETE Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 12 Basic stats:
COMPLETE Column stats: NONE
value expressions: _col0 (type: int), _col1 (type:
int), _col2 (type: binary)
Reducer 6
Reduce Operator Tree:
Group By Operator
aggregations: min(VALUE._col0), max(VALUE._col1),
bloom_filter(VALUE._col2, expectedEntries=3)
mode: final
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE
Column stats: NONE
Spark Runtime Filter Partition Pruning Sink Operator
Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE
Column stats: NONE
target column name: foo1
target work: Map 4
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 28), Map 4 (PARTITION-LEVEL
SORT, 28)
Reducer 3 <- Reducer 2 (GROUP, 1)
DagName: root_20180222135336_d8f32495-a93d-4c59-8b56-7a9d78304a41:3
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: pokes
filterExpr: foo is not null (type: boolean)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: foo is not null (type: boolean)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: foo (type: int)
sort order: +
Map-reduce partition columns: foo (type: int)
Statistics: Num rows: 3 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
Map 4
Map Operator Tree:
TableScan
alias: poke1
filterExpr: (foo1 is not null and (foo1 BETWEEN
DynamicValue(RS_3_pokes_foo_min) AND DynamicValue(RS_3_pokes_foo_max) and
in_bloom_filter(foo1, DynamicValue(RS_3_pokes_foo_bloom_filter))) and (fil =
123)) (type: boolean)
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: (foo1 is not null and (foo1 BETWEEN
DynamicValue(RS_3_pokes_foo_min) AND DynamicValue(RS_3_pokes_foo_max) and
in_bloom_filter(foo1, DynamicValue(RS_3_pokes_foo_bloom_filter))) and (fil =
123)) (type: boolean)
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: foo1 (type: int)
sort order: +
Map-reduce partition columns: foo1 (type: int)
Statistics: Num rows: 1 Data size: 5 Basic stats:
COMPLETE Column stats: NONE
Reducer 2
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 foo (type: int)
1 foo1 (type: int)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
value expressions: _col0 (type: bigint)
Reducer 3
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.337 seconds, Fetched: 120 row(s)
{code}
When disable RF "set hive.spark.dynamic.runtimefilter.pruning=false;", the
explain shows:
{code:java}
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Spark
Edges:
Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 28), Map 4 (PARTITION-LEVEL
SORT, 28)
Reducer 3 <- Reducer 2 (GROUP, 1)
DagName: root_20180222134802_5684d84b-ac66-491f-a4b4-3c9aca32c547:2
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: pokes
filterExpr: foo is not null (type: boolean)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: foo is not null (type: boolean)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: foo (type: int)
sort order: +
Map-reduce partition columns: foo (type: int)
Statistics: Num rows: 3 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
Map 4
Map Operator Tree:
TableScan
alias: poke1
filterExpr: (foo1 is not null and (fil = 123)) (type: boolean)
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE
Column stats: NONE
Filter Operator
predicate: (foo1 is not null and (fil = 123)) (type:
boolean)
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
key expressions: foo1 (type: int)
sort order: +
Map-reduce partition columns: foo1 (type: int)
Statistics: Num rows: 1 Data size: 5 Basic stats:
COMPLETE Column stats: NONE
Reducer 2
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
keys:
0 foo (type: int)
1 foo1 (type: int)
Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
value expressions: _col0 (type: bigint)
Reducer 3
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE
Column stats: NONE
table:
input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
Time taken: 0.615 seconds, Fetched: 79 row(s)
{code}
There are two major difference when enable RF:
* Create a new stage to generate the runtime filter info for the small table
and flush the info to HDFS (Stage 2).
* Apply the runtime filter expression in the Target Work(Map 4).
[~stakiar], If have any questions, please tell me! Thanks for your review!
> Dynamic Min-Max/BloomFilter runtime-filtering in HoS
> ----------------------------------------------------
>
> Key: HIVE-18340
> URL: https://issues.apache.org/jira/browse/HIVE-18340
> Project: Hive
> Issue Type: New Feature
> Components: Spark
> Affects Versions: 3.0.0
> Reporter: Ke Jia
> Assignee: Ke Jia
> Priority: Major
> Attachments: HIVE-18340.1.patch, HIVE-18340.2.patch
>
>
> Tez implemented Dynamic Min-Max/BloomFilter runtime-filtering in HIVE-15269
> and we should implement the same in HOS.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)