[ 
https://issues.apache.org/jira/browse/HIVE-22707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

László Bodor updated HIVE-22707:
--------------------------------
    Description: 
Given a scenario, when there are 2 different buckets, and the output is written 
to another bucket than the source. Under specific circumstances, 
FileSinkOperator is only used in Reducer stages, and if a root work in that 
stage is a merge join work, it's not scanned for output uris/paths, therefore 
needed delegation tokens are not fetched for e.g. the output s3 bucket.

https://github.com/apache/hive/blob/0df4f6c61010b64246d4790f9ce14e966ef34dcb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java#L1507-L1514
{code}
  public void addCredentials(BaseWork work, DAG dag) throws IOException {
    
dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
    if (work instanceof MapWork) {
      addCredentials((MapWork) work, dag);
    } else if (work instanceof ReduceWork) {
      addCredentials((ReduceWork) work, dag);
    }
  }
{code}

sample plan, note Merge Join Operator [MERGEJOIN_35]
{code}
+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| Plan optimized by CBO.                             |
|                                                    |
| Vertex dependency in root stage                    |
| Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) |
| Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
|                                                    |
| Stage-3                                            |
|   Stats Work{}                                     |
|     Stage-4                                        |
|       Create 
Table{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
|         Stage-0                                    |
|           Move Operator                            |
|             Stage-1                                |
|               Reducer 3                            |
|               File Output Operator [FS_20]         |
|                 Group By Operator [GBY_18] (rows=1 width=440) |
|                   
Output:["_col0"],aggregations:["compute_stats(VALUE._col0)"] |
|                 <-Reducer 2 [CUSTOM_SIMPLE_EDGE]   |
|                   File Output Operator [FS_10]     |
|                     
table:{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
|                     Select Operator [SEL_9] (rows=8400 width=7) |
|                       Output:["_col0"]             |
|                       Merge Join Operator [MERGEJOIN_35] (rows=8400 width=7) |
|                         Conds:RS_38._col1=RS_41._col0(Inner),Output:["_col1"] 
|
|                       <-Map 1 [SIMPLE_EDGE] vectorized |
|                         SHUFFLE [RS_38]            |
|                           PartitionCols:_col1      |
|                           Select Operator [SEL_37] (rows=16799 width=15) |
|                             Output:["_col1"]       |
|                             Filter Operator [FIL_36] (rows=16799 width=15) |
|                               predicate:((cs_sold_time_sk = 74858L) and 
cs_call_center_sk is not null) |
|                               TableScan [TS_0] (rows=1439980416 width=15) |
|                                 
tpcds_bin_partitioned_orc_1000@catalog_sales,cs, ACID 
table,Tbl:COMPLETE,Col:PARTIAL,Output:["cs_sold_time_sk","cs_call_center_sk"] |
|                       <-Map 4 [SIMPLE_EDGE] vectorized |
|                         SHUFFLE [RS_41]            |
|                           PartitionCols:_col0      |
|                           Select Operator [SEL_40] (rows=21 width=107) |
|                             Output:["_col0"]       |
|                             Filter Operator [FIL_39] (rows=21 width=107) |
|                               predicate:((CAST( cc_county AS STRING) = 
'Williamson County') and cc_call_center_sk is not null) |
|                               TableScan [TS_3] (rows=42 width=107) |
|                                 
tpcds_bin_partitioned_orc_1000@call_center,cc, ACID 
table,Tbl:COMPLETE,Col:COMPLETE,Output:["cc_call_center_sk","cc_county"] |
|                   PARTITION_ONLY_SHUFFLE [RS_17]   |
|                     Group By Operator [GBY_16] (rows=1 width=424) |
|                       Output:["_col0"],aggregations:["compute_stats(col1, 
'hll')"] |
|                       Select Operator [SEL_15] (rows=8400 width=7) |
|                         Output:["col1"]            |
|                          Please refer to the previous Select Operator [SEL_9] 
|
|         Stage-2                                    |
|           Dependency Collection{}                  |
|              Please refer to the previous Stage-1  |
|                                                    |
+----------------------------------------------------+
{code}

  was:
Given a scenario, when there are 2 different buckets, and the output is written 
to another bucket than the source. Under specific circumstances, 
FileSinkOperator is only used in Reducer stages, and if a root work in that 
stage is a merge join work, it's not scanned for output uris/paths, therefore 
needed delegation tokens are not fetched for e.g. the output s3 bucket.

https://github.com/apache/hive/blob/0df4f6c61010b64246d4790f9ce14e966ef34dcb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java#L1507-L1514
{code}
  public void addCredentials(BaseWork work, DAG dag) throws IOException {
    
dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
    if (work instanceof MapWork) {
      addCredentials((MapWork) work, dag);
    } else if (work instanceof ReduceWork) {
      addCredentials((ReduceWork) work, dag);
    }
  }
{code}

sample plan, not Merge Join Operator [MERGEJOIN_35]
{code}
+----------------------------------------------------+
|                      Explain                       |
+----------------------------------------------------+
| Plan optimized by CBO.                             |
|                                                    |
| Vertex dependency in root stage                    |
| Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) |
| Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
|                                                    |
| Stage-3                                            |
|   Stats Work{}                                     |
|     Stage-4                                        |
|       Create 
Table{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
|         Stage-0                                    |
|           Move Operator                            |
|             Stage-1                                |
|               Reducer 3                            |
|               File Output Operator [FS_20]         |
|                 Group By Operator [GBY_18] (rows=1 width=440) |
|                   
Output:["_col0"],aggregations:["compute_stats(VALUE._col0)"] |
|                 <-Reducer 2 [CUSTOM_SIMPLE_EDGE]   |
|                   File Output Operator [FS_10]     |
|                     
table:{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
|                     Select Operator [SEL_9] (rows=8400 width=7) |
|                       Output:["_col0"]             |
|                       Merge Join Operator [MERGEJOIN_35] (rows=8400 width=7) |
|                         Conds:RS_38._col1=RS_41._col0(Inner),Output:["_col1"] 
|
|                       <-Map 1 [SIMPLE_EDGE] vectorized |
|                         SHUFFLE [RS_38]            |
|                           PartitionCols:_col1      |
|                           Select Operator [SEL_37] (rows=16799 width=15) |
|                             Output:["_col1"]       |
|                             Filter Operator [FIL_36] (rows=16799 width=15) |
|                               predicate:((cs_sold_time_sk = 74858L) and 
cs_call_center_sk is not null) |
|                               TableScan [TS_0] (rows=1439980416 width=15) |
|                                 
tpcds_bin_partitioned_orc_1000@catalog_sales,cs, ACID 
table,Tbl:COMPLETE,Col:PARTIAL,Output:["cs_sold_time_sk","cs_call_center_sk"] |
|                       <-Map 4 [SIMPLE_EDGE] vectorized |
|                         SHUFFLE [RS_41]            |
|                           PartitionCols:_col0      |
|                           Select Operator [SEL_40] (rows=21 width=107) |
|                             Output:["_col0"]       |
|                             Filter Operator [FIL_39] (rows=21 width=107) |
|                               predicate:((CAST( cc_county AS STRING) = 
'Williamson County') and cc_call_center_sk is not null) |
|                               TableScan [TS_3] (rows=42 width=107) |
|                                 
tpcds_bin_partitioned_orc_1000@call_center,cc, ACID 
table,Tbl:COMPLETE,Col:COMPLETE,Output:["cc_call_center_sk","cc_county"] |
|                   PARTITION_ONLY_SHUFFLE [RS_17]   |
|                     Group By Operator [GBY_16] (rows=1 width=424) |
|                       Output:["_col0"],aggregations:["compute_stats(col1, 
'hll')"] |
|                       Select Operator [SEL_15] (rows=8400 width=7) |
|                         Output:["col1"]            |
|                          Please refer to the previous Select Operator [SEL_9] 
|
|         Stage-2                                    |
|           Dependency Collection{}                  |
|              Please refer to the previous Stage-1  |
|                                                    |
+----------------------------------------------------+
{code}


> MergeJoinWork should be considered while collecting DAG credentials
> -------------------------------------------------------------------
>
>                 Key: HIVE-22707
>                 URL: https://issues.apache.org/jira/browse/HIVE-22707
>             Project: Hive
>          Issue Type: Bug
>            Reporter: László Bodor
>            Assignee: László Bodor
>            Priority: Major
>         Attachments: HIVE-22707.01.patch
>
>
> Given a scenario, when there are 2 different buckets, and the output is 
> written to another bucket than the source. Under specific circumstances, 
> FileSinkOperator is only used in Reducer stages, and if a root work in that 
> stage is a merge join work, it's not scanned for output uris/paths, therefore 
> needed delegation tokens are not fetched for e.g. the output s3 bucket.
> https://github.com/apache/hive/blob/0df4f6c61010b64246d4790f9ce14e966ef34dcb/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java#L1507-L1514
> {code}
>   public void addCredentials(BaseWork work, DAG dag) throws IOException {
>     
> dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
>     if (work instanceof MapWork) {
>       addCredentials((MapWork) work, dag);
>     } else if (work instanceof ReduceWork) {
>       addCredentials((ReduceWork) work, dag);
>     }
>   }
> {code}
> sample plan, note Merge Join Operator [MERGEJOIN_35]
> {code}
> +----------------------------------------------------+
> |                      Explain                       |
> +----------------------------------------------------+
> | Plan optimized by CBO.                             |
> |                                                    |
> | Vertex dependency in root stage                    |
> | Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) |
> | Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE)        |
> |                                                    |
> | Stage-3                                            |
> |   Stats Work{}                                     |
> |     Stage-4                                        |
> |       Create 
> Table{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
> |         Stage-0                                    |
> |           Move Operator                            |
> |             Stage-1                                |
> |               Reducer 3                            |
> |               File Output Operator [FS_20]         |
> |                 Group By Operator [GBY_18] (rows=1 width=440) |
> |                   
> Output:["_col0"],aggregations:["compute_stats(VALUE._col0)"] |
> |                 <-Reducer 2 [CUSTOM_SIMPLE_EDGE]   |
> |                   File Output Operator [FS_10]     |
> |                     
> table:{"name:":"tpcds_bin_partitioned_orc_1000.catalog_sales_out"} |
> |                     Select Operator [SEL_9] (rows=8400 width=7) |
> |                       Output:["_col0"]             |
> |                       Merge Join Operator [MERGEJOIN_35] (rows=8400 
> width=7) |
> |                         
> Conds:RS_38._col1=RS_41._col0(Inner),Output:["_col1"] |
> |                       <-Map 1 [SIMPLE_EDGE] vectorized |
> |                         SHUFFLE [RS_38]            |
> |                           PartitionCols:_col1      |
> |                           Select Operator [SEL_37] (rows=16799 width=15) |
> |                             Output:["_col1"]       |
> |                             Filter Operator [FIL_36] (rows=16799 width=15) |
> |                               predicate:((cs_sold_time_sk = 74858L) and 
> cs_call_center_sk is not null) |
> |                               TableScan [TS_0] (rows=1439980416 width=15) |
> |                                 
> tpcds_bin_partitioned_orc_1000@catalog_sales,cs, ACID 
> table,Tbl:COMPLETE,Col:PARTIAL,Output:["cs_sold_time_sk","cs_call_center_sk"] 
> |
> |                       <-Map 4 [SIMPLE_EDGE] vectorized |
> |                         SHUFFLE [RS_41]            |
> |                           PartitionCols:_col0      |
> |                           Select Operator [SEL_40] (rows=21 width=107) |
> |                             Output:["_col0"]       |
> |                             Filter Operator [FIL_39] (rows=21 width=107) |
> |                               predicate:((CAST( cc_county AS STRING) = 
> 'Williamson County') and cc_call_center_sk is not null) |
> |                               TableScan [TS_3] (rows=42 width=107) |
> |                                 
> tpcds_bin_partitioned_orc_1000@call_center,cc, ACID 
> table,Tbl:COMPLETE,Col:COMPLETE,Output:["cc_call_center_sk","cc_county"] |
> |                   PARTITION_ONLY_SHUFFLE [RS_17]   |
> |                     Group By Operator [GBY_16] (rows=1 width=424) |
> |                       Output:["_col0"],aggregations:["compute_stats(col1, 
> 'hll')"] |
> |                       Select Operator [SEL_15] (rows=8400 width=7) |
> |                         Output:["col1"]            |
> |                          Please refer to the previous Select Operator 
> [SEL_9] |
> |         Stage-2                                    |
> |           Dependency Collection{}                  |
> |              Please refer to the previous Stage-1  |
> |                                                    |
> +----------------------------------------------------+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to