[ 
https://issues.apache.org/jira/browse/TEZ-145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14490860#comment-14490860
 ] 

Gopal V commented on TEZ-145:
-----------------------------

[~bikassaha]: The figure 7 is identical to the runtime expansion I have drawn 
on my whiteboard.

FYI, we've already implemented the simplest case in Tez when implementing the 
keep-alive shuffle_handler. We stream >1 map-outputs out of a host one after 
the other on a single shuffle request by collapsing many requests into a 
multi-get according to the  
{{tez.runtime.shuffle.fetch.max.task.output.at.once}} parameter.

For [~ozawa], I'm still behind on my design doc, but explaining here anyway.

The Combiner vertex has strange inputs and outputs compared to today's vertices 
- I've been thinking about using a new term "Transducer" instead overloading on 
the idea of the older implementation (Clojure inspired name).

A Transducer vertex accepts all partitions from a subset of mappers which it 
shares some sort of locality with & merges them depending on the destination. 
It processes entire map outputs instead of reading a single partition, to 
maximize sequential reads on the first pass.

A Transducer vertex outputs data out of it in the same order as it ingested, 
resulting in a sort avoidance in case the next stage requires merged input.

This is going to be strange, since the input edge is a combination of treating 
map-output as splits, but with the additional information about the input 
partition - since we lose the partition after the initial collect(), we need to 
merge the different partitions independently while feeding into the combiner to 
preserve the final output order between partitions (i.e "A" could be in 
partition 7 and "Z" in partition 0, and we want the sort order to be 
(partition,key) just like DefaultSorter output).

We need a "PartitionPreservingMergedInput" with a OrderedPartitionedOutput on 
the upstream, while we can go with the regular Unordered for the unordered 
cases.

On the output side, we'll have to combine a UnorderedPartitionedOutput (call it 
a "PreorderedPartitionedOutput" to indicate the sorted nature still) with a 
OrderedGroupedMergedKVInput on the same edge. And therefore we need to take 
care to not to reorder the data when writing it out.

[~bikassaha]/[~hitesh] can correct me, but that needs a minimum of 2 edge 
managers and 1 vertex manager needed for the Tez impl of this (& then translate 
the MR DAG into this Tez DAG).

This needed the Pipelined shuffle to be implemented first to be performant.

The performance corner case kicks in when we can't afford to wait too long for 
rack-local allocations, so when we start up a task we cannot be sure it will 
start up in the right rack - we only get to give YARN a hint about allocation 
and end up with a TaskSpec + allocation which might not have the right locality.

Therefore, a combiner task which gets an off-rack URL should immediately 
duplicate out the same composite DME to the final destination, with a clear 
marker stating that this is a partial chunk (i.e the pipelined data movement 
needs to be implemented for combiners to function without accidental miss 
costs).

This short-circuits the accidental sub-optimal scenario due to cluster 
occupancy - so we're finally ready for this now that pipelined fetchers are in 
trunk.

> Support a combiner processor that can run non-local to map/reduce nodes
> -----------------------------------------------------------------------
>
>                 Key: TEZ-145
>                 URL: https://issues.apache.org/jira/browse/TEZ-145
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Hitesh Shah
>            Assignee: Tsuyoshi Ozawa
>         Attachments: TEZ-145.2.patch, WIP-TEZ-145-001.patch
>
>
> For aggregate operators that can benefit by running in multi-level trees, 
> support of being able to run a combiner in a non-local mode would allow 
> performance efficiencies to be gained by running a combiner at a rack-level. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to