[ 
https://issues.apache.org/jira/browse/TEZ-2496?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617362#comment-14617362
 ] 

Bikas Saha commented on TEZ-2496:
---------------------------------

Summarizing an offline discussion on the pros and cons on this approach
1) lives in user land - easier to iterate
2) memory efficient - shuffle vertex manager can apply policy specific to 
partition stats use case. deterministic sizes mean it can aggregate upon event 
receipt and discard the raw data
3) cpu efficient - because its not calling getStatistics() repeatedly
4) works with pipelining and getting early stats from running tasks. 
getStatistics() would get expensive for this.
5) allows for other use cases like sending partition stats to inputs for 
runtime optimizations. Only the shuffle vertex manager can correctly do this 
since it merges partitions during auto reduce.
6) Once this has stabilized and optimized then we can transfer the logic to a 
partition stats API that would be generally available as part of the system.

> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source
> ----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2496
>                 URL: https://issues.apache.org/jira/browse/TEZ-2496
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2496.1.patch, TEZ-2496.2.patch, TEZ-2496.3.patch, 
> TEZ-2496.4.patch, TEZ-2496.5.patch, TEZ-2496.6.patch, TEZ-2496.7.patch, 
> TEZ-2496.8.patch
>
>
> Consider scheduling tasks in ShuffleVertexManager based on the partition 
> sizes from the source.  This would be helpful in scenarios, where there is 
> limited resources (or concurrent jobs running or multiple waves) with 
> dataskew and the task which gets large amount of data gets sceheduled much 
> later.
> e.g Consider the following hive query running in a queue with limited 
> capacity (42 slots in total) @ 200 GB scale
> {noformat}
> CREATE TEMPORARY TABLE sampleData AS
>   SELECT CASE
>            WHEN ss_sold_time_sk IS NULL THEN 70429
>            ELSE ss_sold_time_sk
>        END AS ss_sold_time_sk,
>        ss_item_sk,
>        ss_customer_sk,
>        ss_cdemo_sk,
>        ss_hdemo_sk,
>        ss_addr_sk,
>        ss_store_sk,
>        ss_promo_sk,
>        ss_ticket_number,
>        ss_quantity,
>        ss_wholesale_cost,
>        ss_list_price,
>        ss_sales_price,
>        ss_ext_discount_amt,
>        ss_ext_sales_price,
>        ss_ext_wholesale_cost,
>        ss_ext_list_price,
>        ss_ext_tax,
>        ss_coupon_amt,
>        ss_net_paid,
>        ss_net_paid_inc_tax,
>        ss_net_profit,
>        ss_sold_date_sk
>   FROM store_sales distribute by ss_sold_time_sk;
> {noformat}
> This generated 39 maps and 134 reduce slots (3 reduce waves). When lots of 
> nulls are there for ss_sold_time_sk, it would tend to have data skew towards 
> 70429.  If the reducer which gets this data gets scheduled much earlier (i.e 
> in first wave itself), entire job would finish fast.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to