[ 
https://issues.apache.org/jira/browse/HIVE-18340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16372473#comment-16372473
 ] 

Ke Jia commented on HIVE-18340:
-------------------------------

[~stakiar]:

This optimization has following effect:
{code:java}
set hive.optimize.index.filter=true;
set hive.auto.convert.join=false;
create table pokes(foo int);
create table poke1(foo1 int, fil string);
insert into table pokes values(1);
insert into table poke1 values(1, "123");

explain select count(*) from pokes join poke1  on (pokes.foo = poke1.foo1) 
where poke1.fil=123;
{code}
When enable RF "set hive.spark.dynamic.runtimefilter.pruning=true;", the 
explain shows:

{code:java}
STAGE DEPENDENCIES:
  Stage-2 is a root stage
  Stage-1 depends on stages: Stage-2
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-2
    Spark
      Edges:
        Reducer 6 <- Map 5 (GROUP, 1)
      DagName: root_20180222135336_d8f32495-a93d-4c59-8b56-7a9d78304a41:4
      Vertices:
        Map 5
            Map Operator Tree:
                TableScan
                  alias: pokes
                  filterExpr: foo is not null (type: boolean)
                  Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: foo is not null (type: boolean)
                    Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                    Select Operator
                      expressions: foo (type: int)
                      outputColumnNames: _col0
                      Statistics: Num rows: 3 Data size: 4 Basic stats: 
COMPLETE Column stats: NONE
                      Group By Operator
                        aggregations: min(_col0), max(_col0), 
bloom_filter(_col0, expectedEntries=3)
                        mode: hash
                        outputColumnNames: _col0, _col1, _col2
                        Statistics: Num rows: 1 Data size: 12 Basic stats: 
COMPLETE Column stats: NONE
                        Reduce Output Operator
                          sort order:
                          Statistics: Num rows: 1 Data size: 12 Basic stats: 
COMPLETE Column stats: NONE
                          value expressions: _col0 (type: int), _col1 (type: 
int), _col2 (type: binary)
        Reducer 6
            Reduce Operator Tree:
              Group By Operator
                aggregations: min(VALUE._col0), max(VALUE._col1), 
bloom_filter(VALUE._col2, expectedEntries=3)
                mode: final
                outputColumnNames: _col0, _col1, _col2
                Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE 
Column stats: NONE
                Spark Runtime Filter Partition Pruning Sink Operator
                  Statistics: Num rows: 1 Data size: 12 Basic stats: COMPLETE 
Column stats: NONE
                  target column name: foo1
                  target work: Map 4

  Stage: Stage-1
    Spark
      Edges:
        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 28), Map 4 (PARTITION-LEVEL 
SORT, 28)
        Reducer 3 <- Reducer 2 (GROUP, 1)
      DagName: root_20180222135336_d8f32495-a93d-4c59-8b56-7a9d78304a41:3
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: pokes
                  filterExpr: foo is not null (type: boolean)
                  Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: foo is not null (type: boolean)
                    Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                    Reduce Output Operator
                      key expressions: foo (type: int)
                      sort order: +
                      Map-reduce partition columns: foo (type: int)
                      Statistics: Num rows: 3 Data size: 4 Basic stats: 
COMPLETE Column stats: NONE
        Map 4
            Map Operator Tree:
                TableScan
                  alias: poke1
                  filterExpr: (foo1 is not null and (foo1 BETWEEN 
DynamicValue(RS_3_pokes_foo_min) AND DynamicValue(RS_3_pokes_foo_max) and 
in_bloom_filter(foo1, DynamicValue(RS_3_pokes_foo_bloom_filter))) and (fil = 
123)) (type: boolean)
                  Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: (foo1 is not null and (foo1 BETWEEN 
DynamicValue(RS_3_pokes_foo_min) AND DynamicValue(RS_3_pokes_foo_max) and 
in_bloom_filter(foo1, DynamicValue(RS_3_pokes_foo_bloom_filter))) and (fil = 
123)) (type: boolean)
                    Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE 
Column stats: NONE
                    Reduce Output Operator
                      key expressions: foo1 (type: int)
                      sort order: +
                      Map-reduce partition columns: foo1 (type: int)
                      Statistics: Num rows: 1 Data size: 5 Basic stats: 
COMPLETE Column stats: NONE
        Reducer 2
            Reduce Operator Tree:
              Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 foo (type: int)
                  1 foo1 (type: int)
                Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                Group By Operator
                  aggregations: count()
                  mode: hash
                  outputColumnNames: _col0
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                  Reduce Output Operator
                    sort order:
                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                    value expressions: _col0 (type: bigint)
        Reducer 3
            Reduce Operator Tree:
              Group By Operator
                aggregations: count(VALUE._col0)
                mode: mergepartial
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                  table:
                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 0.337 seconds, Fetched: 120 row(s)
{code}
When disable RF "set hive.spark.dynamic.runtimefilter.pruning=false;", the 
explain shows:

{code:java}
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Spark
      Edges:
        Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 28), Map 4 (PARTITION-LEVEL 
SORT, 28)
        Reducer 3 <- Reducer 2 (GROUP, 1)
      DagName: root_20180222134802_5684d84b-ac66-491f-a4b4-3c9aca32c547:2
      Vertices:
        Map 1
            Map Operator Tree:
                TableScan
                  alias: pokes
                  filterExpr: foo is not null (type: boolean)
                  Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: foo is not null (type: boolean)
                    Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                    Reduce Output Operator
                      key expressions: foo (type: int)
                      sort order: +
                      Map-reduce partition columns: foo (type: int)
                      Statistics: Num rows: 3 Data size: 4 Basic stats: 
COMPLETE Column stats: NONE
        Map 4
            Map Operator Tree:
                TableScan
                  alias: poke1
                  filterExpr: (foo1 is not null and (fil = 123)) (type: boolean)
                  Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: (foo1 is not null and (fil = 123)) (type: 
boolean)
                    Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE 
Column stats: NONE
                    Reduce Output Operator
                      key expressions: foo1 (type: int)
                      sort order: +
                      Map-reduce partition columns: foo1 (type: int)
                      Statistics: Num rows: 1 Data size: 5 Basic stats: 
COMPLETE Column stats: NONE
        Reducer 2
            Reduce Operator Tree:
              Join Operator
                condition map:
                     Inner Join 0 to 1
                keys:
                  0 foo (type: int)
                  1 foo1 (type: int)
                Statistics: Num rows: 3 Data size: 4 Basic stats: COMPLETE 
Column stats: NONE
                Group By Operator
                  aggregations: count()
                  mode: hash
                  outputColumnNames: _col0
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                  Reduce Output Operator
                    sort order:
                    Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                    value expressions: _col0 (type: bigint)
        Reducer 3
            Reduce Operator Tree:
              Group By Operator
                aggregations: count(VALUE._col0)
                mode: mergepartial
                outputColumnNames: _col0
                Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                File Output Operator
                  compressed: false
                  Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE 
Column stats: NONE
                  table:
                      input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                      output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 0.615 seconds, Fetched: 79 row(s)
{code}
There are two major difference when enable RF:
* Create a new stage to generate the runtime filter info for the small table 
and flush the info to HDFS (Stage 2).
* Apply the runtime filter expression in the Target Work(Map 4).
[~stakiar], If have any questions, please tell me! Thanks for your review!


> Dynamic Min-Max/BloomFilter runtime-filtering in HoS
> ----------------------------------------------------
>
>                 Key: HIVE-18340
>                 URL: https://issues.apache.org/jira/browse/HIVE-18340
>             Project: Hive
>          Issue Type: New Feature
>          Components: Spark
>    Affects Versions: 3.0.0
>            Reporter: Ke Jia
>            Assignee: Ke Jia
>            Priority: Major
>         Attachments: HIVE-18340.1.patch, HIVE-18340.2.patch
>
>
> Tez implemented Dynamic Min-Max/BloomFilter runtime-filtering in HIVE-15269 
> and we should implement the same in HOS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to