[
https://issues.apache.org/jira/browse/TEZ-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15159291#comment-15159291
]
Rohini Palaniswamy commented on TEZ-3126:
-----------------------------------------
bq. Wouldn't that break the max data per reducer limit? Ignoring the min data
hint may be fine but ignoring the max data limit could result in failure
because it may break operator assumptions (eg. size of hash table etc.). Say
the reducer was designed to handle 1G of data and we send it 1.7G instead.
Currently code is blinding assuming all reducers have equal data and
combining consecutive basePartitionRange number of reducers into one reducer.
This already is sending more than desiredTaskSize data to some reducers
ignoring the max data limit and empty data to other reducers when filters are
used and data is skewed which is very inefficient. Proper fix for this is to
bucket according to size as discussed in previous comments and combine reducers
based on that. basePartitionRange should allowed to be a fraction to group
better, but if we are bucketing and grouping on size basePartitionRange will
not be required anymore as partitioning is not based on ranges.
> Log reason for not reducing parallelism
> ---------------------------------------
>
> Key: TEZ-3126
> URL: https://issues.apache.org/jira/browse/TEZ-3126
> Project: Apache Tez
> Issue Type: Bug
> Reporter: Jonathan Eagles
> Assignee: Jonathan Eagles
> Priority: Critical
> Fix For: 0.7.1, 0.8.3
>
> Attachments: TEZ-3126.1.patch, TEZ-3126.2.patch
>
>
> For example, when reducing parallelism from 36 to 22. The basePartitionRange
> will be 1 and will not re-configure the vertex.
> {code:java|title=ShuffleVertexManager#determineParallelismAndApply|borderStyle=dashed|bgColor=lightgrey}
> int desiredTaskParallelism =
> (int)(
> (expectedTotalSourceTasksOutputSize+desiredTaskInputDataSize-1)/
> desiredTaskInputDataSize);
> if(desiredTaskParallelism < minTaskParallelism) {
> desiredTaskParallelism = minTaskParallelism;
> }
>
> if(desiredTaskParallelism >= currentParallelism) {
> return true;
> }
>
> // most shufflers will be assigned this range
> basePartitionRange = currentParallelism/desiredTaskParallelism;
>
> if (basePartitionRange <= 1) {
> // nothing to do if range is equal 1 partition. shuffler does it by
> default
> return true;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)