[ 
https://issues.apache.org/jira/browse/HIVE-17396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Janaki Lahorani updated HIVE-17396:
-----------------------------------
    Description: 
When the target of a partition pruning sink operator is in not the same as the 
target of hash table sink operator, both source and target gets scheduled 
within the same spark job, and that can result in File Not Found Exception.  
HIVE-17225 has a fix to disable DPP in that scenario.  This JIRA is to support 
DPP for such cases.

Test Case:
SET hive.spark.dynamic.partition.pruning=true;
SET hive.auto.convert.join=true;
SET hive.strict.checks.cartesian.product=false;

CREATE TABLE part_table1 (col int) PARTITIONED BY (part1_col int);
CREATE TABLE part_table2 (col int) PARTITIONED BY (part2_col int);

CREATE TABLE reg_table (col int);

ALTER TABLE part_table1 ADD PARTITION (part1_col = 1);

ALTER TABLE part_table2 ADD PARTITION (part2_col = 1);
ALTER TABLE part_table2 ADD PARTITION (part2_col = 2);

INSERT INTO TABLE part_table1 PARTITION (part1_col = 1) VALUES (1);

INSERT INTO TABLE part_table2 PARTITION (part2_col = 1) VALUES (1);
INSERT INTO TABLE part_table2 PARTITION (part2_col = 2) VALUES (2);

INSERT INTO table reg_table VALUES (1), (2), (3), (4), (5), (6);

EXPLAIN SELECT *
FROM   part_table1 pt1,
       part_table2 pt2,
       reg_table rt
WHERE  rt.col = pt1.part1_col
AND    pt2.part2_col = pt1.part1_col;

Plan:
STAGE DEPENDENCIES:
  Stage-2 is a root stage
  Stage-1 depends on stages: Stage-2
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-2
    Spark
#### A masked pattern was here ####
      Vertices:
        Map 1 
            Map Operator Tree:
                TableScan
                  alias: pt1
                  Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE 
Column stats: NONE
                  Select Operator
                    expressions: col (type: int), part1_col (type: int)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE 
Column stats: NONE
                    Spark HashTable Sink Operator
                      keys:
                        0 _col1 (type: int)
                        1 _col1 (type: int)
                        2 _col0 (type: int)
                    Select Operator
                      expressions: _col1 (type: int)
                      outputColumnNames: _col0
                      Statistics: Num rows: 1 Data size: 1 Basic stats: 
COMPLETE Column stats: NONE
                      Group By Operator
                        keys: _col0 (type: int)
                        mode: hash
                        outputColumnNames: _col0
                        Statistics: Num rows: 1 Data size: 1 Basic stats: 
COMPLETE Column stats: NONE
                        Spark Partition Pruning Sink Operator
                          Target column: part2_col (int)
                          partition key expr: part2_col
                          Statistics: Num rows: 1 Data size: 1 Basic stats: 
COMPLETE Column stats: NONE
                          target work: Map 2
            Local Work:
              Map Reduce Local Work
        Map 2 
            Map Operator Tree:
                TableScan
                  alias: pt2
                  Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE 
Column stats: NONE
                  Select Operator
                    expressions: col (type: int), part2_col (type: int)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE 
Column stats: NONE
                    Spark HashTable Sink Operator
                      keys:
                        0 _col1 (type: int)
                        1 _col1 (type: int)
                        2 _col0 (type: int)
            Local Work:
              Map Reduce Local Work

  Stage: Stage-1
    Spark
#### A masked pattern was here ####
      Vertices:
        Map 3 
            Map Operator Tree:
                TableScan
                  alias: rt
                  Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE 
Column stats: NONE
                  Filter Operator
                    predicate: col is not null (type: boolean)
                    Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE 
Column stats: NONE
                    Select Operator
                      expressions: col (type: int)
                      outputColumnNames: _col0
                      Statistics: Num rows: 6 Data size: 6 Basic stats: 
COMPLETE Column stats: NONE
                      Map Join Operator
                        condition map:
                             Inner Join 0 to 1
                             Inner Join 0 to 2
                        keys:
                          0 _col1 (type: int)
                          1 _col1 (type: int)
                          2 _col0 (type: int)
                        outputColumnNames: _col0, _col1, _col2, _col3, _col4
                        input vertices:
                          0 Map 1
                          1 Map 2
                        Statistics: Num rows: 13 Data size: 13 Basic stats: 
COMPLETE Column stats: NONE
                        File Output Operator
                          compressed: false
                          Statistics: Num rows: 13 Data size: 13 Basic stats: 
COMPLETE Column stats: NONE
                          table:
                              input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
                              output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                              serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
            Local Work:
              Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink


> Support DPP with map joins where the source and target belong in the same 
> stage
> -------------------------------------------------------------------------------
>
>                 Key: HIVE-17396
>                 URL: https://issues.apache.org/jira/browse/HIVE-17396
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Spark
>            Reporter: Janaki Lahorani
>            Assignee: Janaki Lahorani
>
> When the target of a partition pruning sink operator is in not the same as 
> the target of hash table sink operator, both source and target gets scheduled 
> within the same spark job, and that can result in File Not Found Exception.  
> HIVE-17225 has a fix to disable DPP in that scenario.  This JIRA is to 
> support DPP for such cases.
> Test Case:
> SET hive.spark.dynamic.partition.pruning=true;
> SET hive.auto.convert.join=true;
> SET hive.strict.checks.cartesian.product=false;
> CREATE TABLE part_table1 (col int) PARTITIONED BY (part1_col int);
> CREATE TABLE part_table2 (col int) PARTITIONED BY (part2_col int);
> CREATE TABLE reg_table (col int);
> ALTER TABLE part_table1 ADD PARTITION (part1_col = 1);
> ALTER TABLE part_table2 ADD PARTITION (part2_col = 1);
> ALTER TABLE part_table2 ADD PARTITION (part2_col = 2);
> INSERT INTO TABLE part_table1 PARTITION (part1_col = 1) VALUES (1);
> INSERT INTO TABLE part_table2 PARTITION (part2_col = 1) VALUES (1);
> INSERT INTO TABLE part_table2 PARTITION (part2_col = 2) VALUES (2);
> INSERT INTO table reg_table VALUES (1), (2), (3), (4), (5), (6);
> EXPLAIN SELECT *
> FROM   part_table1 pt1,
>        part_table2 pt2,
>        reg_table rt
> WHERE  rt.col = pt1.part1_col
> AND    pt2.part2_col = pt1.part1_col;
> Plan:
> STAGE DEPENDENCIES:
>   Stage-2 is a root stage
>   Stage-1 depends on stages: Stage-2
>   Stage-0 depends on stages: Stage-1
> STAGE PLANS:
>   Stage: Stage-2
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 1 
>             Map Operator Tree:
>                 TableScan
>                   alias: pt1
>                   Statistics: Num rows: 1 Data size: 1 Basic stats: COMPLETE 
> Column stats: NONE
>                   Select Operator
>                     expressions: col (type: int), part1_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                     Spark HashTable Sink Operator
>                       keys:
>                         0 _col1 (type: int)
>                         1 _col1 (type: int)
>                         2 _col0 (type: int)
>                     Select Operator
>                       expressions: _col1 (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                       Group By Operator
>                         keys: _col0 (type: int)
>                         mode: hash
>                         outputColumnNames: _col0
>                         Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                         Spark Partition Pruning Sink Operator
>                           Target column: part2_col (int)
>                           partition key expr: part2_col
>                           Statistics: Num rows: 1 Data size: 1 Basic stats: 
> COMPLETE Column stats: NONE
>                           target work: Map 2
>             Local Work:
>               Map Reduce Local Work
>         Map 2 
>             Map Operator Tree:
>                 TableScan
>                   alias: pt2
>                   Statistics: Num rows: 2 Data size: 2 Basic stats: COMPLETE 
> Column stats: NONE
>                   Select Operator
>                     expressions: col (type: int), part2_col (type: int)
>                     outputColumnNames: _col0, _col1
>                     Statistics: Num rows: 2 Data size: 2 Basic stats: 
> COMPLETE Column stats: NONE
>                     Spark HashTable Sink Operator
>                       keys:
>                         0 _col1 (type: int)
>                         1 _col1 (type: int)
>                         2 _col0 (type: int)
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-1
>     Spark
> #### A masked pattern was here ####
>       Vertices:
>         Map 3 
>             Map Operator Tree:
>                 TableScan
>                   alias: rt
>                   Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE 
> Column stats: NONE
>                   Filter Operator
>                     predicate: col is not null (type: boolean)
>                     Statistics: Num rows: 6 Data size: 6 Basic stats: 
> COMPLETE Column stats: NONE
>                     Select Operator
>                       expressions: col (type: int)
>                       outputColumnNames: _col0
>                       Statistics: Num rows: 6 Data size: 6 Basic stats: 
> COMPLETE Column stats: NONE
>                       Map Join Operator
>                         condition map:
>                              Inner Join 0 to 1
>                              Inner Join 0 to 2
>                         keys:
>                           0 _col1 (type: int)
>                           1 _col1 (type: int)
>                           2 _col0 (type: int)
>                         outputColumnNames: _col0, _col1, _col2, _col3, _col4
>                         input vertices:
>                           0 Map 1
>                           1 Map 2
>                         Statistics: Num rows: 13 Data size: 13 Basic stats: 
> COMPLETE Column stats: NONE
>                         File Output Operator
>                           compressed: false
>                           Statistics: Num rows: 13 Data size: 13 Basic stats: 
> COMPLETE Column stats: NONE
>                           table:
>                               input format: 
> org.apache.hadoop.mapred.SequenceFileInputFormat
>                               output format: 
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>                               serde: 
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
>             Local Work:
>               Map Reduce Local Work
>   Stage: Stage-0
>     Fetch Operator
>       limit: -1
>       Processor Tree:
>         ListSink



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to