[
https://issues.apache.org/jira/browse/HIVE-17087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16096693#comment-16096693
]
Sahil Takiar edited comment on HIVE-17087 at 7/23/17 10:28 PM:
---------------------------------------------------------------
Here is a brief summary of the issue:
* When {{DynamicPartitionPruningOptimization}} is run, it runs before any join
optimizations are done (including map-join conversion)
* It's add a DPP subtree ({{SEL-GBY-SPARKPRUNINGSINK}} everywhere possible
* After DPP is run, there are a few places where a DPP subtree can be removed:
{{SparkRemoveDynamicPruningBySize}} and during
{{SparkCompiler#runCycleAnalysisForPartitionPruning}}
* This patch adds another place where DPP can be removed, inside
{{SparkMapJoinOptimizer}}
* There are certain scenarios where a DPP subtree needs to be removed during
map-join conversion:
** Say there is a query that can take advantage of DPP, and the small table is
partitioned and the big table is un-partitioned
** If the join is converted to a map-join, it doesn't make sense to run DPP at
all
*** Running DPP would require scanning the entire big table before the map-join
even starts, and would result in scanning the big table twic
* The example above can be extended to any scenario where DPP and map-joins are
involved, if there is a DPP subtree on the big table that is meant to prune
partitions from the small table, then the subtree should be completely removed
* Hive-on-Tez does the same thing, the majority of the code is in this patch is
copied from {{ConvertJoinMapJoin}}; the changes to {{ConvertJoinMapJoin}} were
done in the original DPP patch for Hive-on-Tez
* Some code was also copied from HIVE-10559, which fixes a NPE in this changes
to the map-join conversion
* Added a query to {{spark_dynamic_partition_pruning_2.q}} that was added to
{{dynamic_partition_pruning_2.q}} in HIVE-10559
* I added a new file called {{spark_dynamic_partition_pruning_3.q}} that adds
some queries that join two partitioned tables together, I couldn't find this
style of query in the other dpp .q files
was (Author: stakiar):
Here is a brief summary of the issue:
* When {{DynamicPartitionPruningOptimization}} is run, it runs before any join
optimizations are done (including map-join conversion)
* It's add a DPP subtree ({{SEL-GBY-SPARKPRUNINGSINK}} everywhere possible
* After DPP is run, there are a few places where a DPP subtree can be removed:
{{SparkRemoveDynamicPruningBySize}} and during
{{SparkCompiler#runCycleAnalysisForPartitionPruning}}
* This patch adds another place where DPP can be removed, inside
{{SparkMapJoinOptimizer}}
* There are certain scenarios where a DPP subtree needs to be removed during
map-join conversion:
** Say there is a query that can take advantage of DPP, and the two tables
being join are both partitioned
** If the join is converted to a map-join, then it only makes sense to keep on
of the DPP subtrees; specifically the one that scans the big table and is used
to prune partitions from the small table
*** In a map-join the small table must be scanned completely first before the
big table is read, so it doesn't make sense to have a DPP subtree on the big
table
* The example above can be extended to any scenario where DPP and map-joins are
involved, if there is a DPP subtree on the big table that is meant to prune
partitions from the small table, then the subtree should be completely removed
* Hive-on-Tez does the same thing, the majority of the code is in this patch is
copied from {{ConvertJoinMapJoin}}; the changes to {{ConvertJoinMapJoin}} were
done in the original DPP patch for Hive-on-Tez
* Some code was also copied from HIVE-10559, which fixes a NPE in this changes
to the map-join conversion
* Added a query to {{spark_dynamic_partition_pruning_2.q}} that was added to
{{dynamic_partition_pruning_2.q}} in HIVE-10559
* I added a new file called {{spark_dynamic_partition_pruning_3.q}} that adds
some queries that join two partitioned tables together, I couldn't find this
style of query in the other dpp .q files
> Remove unnecessary HoS DPP trees during map-join conversion
> -----------------------------------------------------------
>
> Key: HIVE-17087
> URL: https://issues.apache.org/jira/browse/HIVE-17087
> Project: Hive
> Issue Type: Sub-task
> Components: Spark
> Reporter: Sahil Takiar
> Assignee: Sahil Takiar
> Attachments: HIVE-17087.1.patch, HIVE-17087.2.patch
>
>
> Ran the following query in the {{TestSparkCliDriver}}:
> {code:sql}
> set hive.spark.dynamic.partition.pruning=true;
> set hive.auto.convert.join=true;
> create table partitioned_table1 (col int) partitioned by (part_col int);
> create table partitioned_table2 (col int) partitioned by (part_col int);
> create table regular_table (col int);
> insert into table regular_table values (1);
> alter table partitioned_table1 add partition (part_col = 1);
> insert into table partitioned_table1 partition (part_col = 1) values (1),
> (2), (3), (4), (5), (6), (7), (8), (9), (10);
> alter table partitioned_table2 add partition (part_col = 1);
> insert into table partitioned_table2 partition (part_col = 1) values (1),
> (2), (3), (4), (5), (6), (7), (8), (9), (10);
> explain select * from partitioned_table1, partitioned_table2 where
> partitioned_table1.part_col = partitioned_table2.part_col;
> {code}
> and got the following explain plan:
> {code}
> STAGE DEPENDENCIES:
> Stage-2 is a root stage
> Stage-3 depends on stages: Stage-2
> Stage-1 depends on stages: Stage-3
> Stage-0 depends on stages: Stage-1
> STAGE PLANS:
> Stage: Stage-2
> Spark
> #### A masked pattern was here ####
> Vertices:
> Map 3
> Map Operator Tree:
> TableScan
> alias: partitioned_table1
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col (type: int), part_col (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: _col1 (type: int)
> outputColumnNames: _col0
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Group By Operator
> keys: _col0 (type: int)
> mode: hash
> outputColumnNames: _col0
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Spark Partition Pruning Sink Operator
> partition key expr: part_col
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> target column name: part_col
> target work: Map 2
> Stage: Stage-3
> Spark
> #### A masked pattern was here ####
> Vertices:
> Map 2
> Map Operator Tree:
> TableScan
> alias: partitioned_table2
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col (type: int), part_col (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Spark HashTable Sink Operator
> keys:
> 0 _col1 (type: int)
> 1 _col1 (type: int)
> Local Work:
> Map Reduce Local Work
> Stage: Stage-1
> Spark
> #### A masked pattern was here ####
> Vertices:
> Map 1
> Map Operator Tree:
> TableScan
> alias: partitioned_table1
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Select Operator
> expressions: col (type: int), part_col (type: int)
> outputColumnNames: _col0, _col1
> Statistics: Num rows: 10 Data size: 11 Basic stats:
> COMPLETE Column stats: NONE
> Map Join Operator
> condition map:
> Inner Join 0 to 1
> keys:
> 0 _col1 (type: int)
> 1 _col1 (type: int)
> outputColumnNames: _col0, _col1, _col2, _col3
> input vertices:
> 1 Map 2
> Statistics: Num rows: 11 Data size: 12 Basic stats:
> COMPLETE Column stats: NONE
> File Output Operator
> compressed: false
> Statistics: Num rows: 11 Data size: 12 Basic stats:
> COMPLETE Column stats: NONE
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> serde:
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
> Local Work:
> Map Reduce Local Work
> Stage: Stage-0
> Fetch Operator
> limit: -1
> Processor Tree:
> ListSink
> {code}
> Stage-2 seems unnecessary, given that Stage-1 is going to do a full table
> scan of {{partitioned_table1}} when running the map-join
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)