[ 
https://issues.apache.org/jira/browse/TEZ-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159291#comment-15159291
 ] 

Rohini Palaniswamy commented on TEZ-3126:
-----------------------------------------

bq. Wouldn't that break the max data per reducer limit? Ignoring the min data 
hint may be fine but ignoring the max data limit could result in failure 
because it may break operator assumptions (eg. size of hash table etc.). Say 
the reducer was designed to handle 1G of data and we send it 1.7G instead.
  Currently code is blinding assuming all reducers have equal data and 
combining consecutive basePartitionRange number of reducers into one reducer. 
This already is sending more than desiredTaskSize data to some reducers 
ignoring the max data limit and empty data to other reducers when filters are 
used and data is skewed which is very inefficient. Proper fix for this is to 
bucket according to size as discussed in previous comments and combine reducers 
based on that. basePartitionRange should allowed to be a fraction to group 
better, but if we are bucketing and grouping on size basePartitionRange will 
not be required anymore as partitioning is not based on ranges.

> Log reason for not reducing parallelism
> ---------------------------------------
>
>                 Key: TEZ-3126
>                 URL: https://issues.apache.org/jira/browse/TEZ-3126
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Jonathan Eagles
>            Assignee: Jonathan Eagles
>            Priority: Critical
>             Fix For: 0.7.1, 0.8.3
>
>         Attachments: TEZ-3126.1.patch, TEZ-3126.2.patch
>
>
> For example, when reducing parallelism from 36 to 22. The basePartitionRange 
> will be 1 and will not re-configure the vertex.
> {code:java|title=ShuffleVertexManager#determineParallelismAndApply|borderStyle=dashed|bgColor=lightgrey}
>     int desiredTaskParallelism = 
>         (int)(
>             (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
>             desiredTaskInputDataSize);
>     if(desiredTaskParallelism < minTaskParallelism) {
>       desiredTaskParallelism = minTaskParallelism;
>     }
>     
>     if(desiredTaskParallelism >= currentParallelism) {
>       return true;
>     }
>     
>     // most shufflers will be assigned this range
>     basePartitionRange = currentParallelism/desiredTaskParallelism;
>     
>     if (basePartitionRange <= 1) {
>       // nothing to do if range is equal 1 partition. shuffler does it by 
> default
>       return true;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to