[ 
https://issues.apache.org/jira/browse/SPARK-2044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020087#comment-14020087
 ] 

Matei Zaharia commented on SPARK-2044:
--------------------------------------

{quote}
1. Is it a goal to support more kind of shuffle: e.g. moving sort from reducer 
to mapper? If yes, it seems it is better to add additional flag to 
shuffleManager. I find similar statements in page 3 
{quote}
The goal is to allow diverse shuffle implementations, so it doesn't make sense 
to add a flag for it. If we add a flag, every ShuffleManager will need to 
implement this feature. Instead we're trying to make the smallest interface 
that the code consuming this data needs, so that we can try multiple 
implementations of ShuffleManager and see which of these features work best.

{quote}
??When the shuffle has no Aggregator (i.e. null or None is passed in), keys and 
values are simply sent across the network. Optionally we might allow the 
ShuffleManager to specify whether keys read from a ShuffleReader are sorted, or 
add a flag to registerShuffle that requests this for keys that have an 
Ordering. This would simplify grouping operators downstream (e.g. cogroup).??
Does this mean that ordering is an inherit property of input data or it wants 
ShuffleManager to perform sorting for the data?
{quote}
The Ordering object means that keys are comparable. This flag here would be to 
tell the ShuffleManager to sort the data, so that downstream algorithms like 
joins can work more efficiently.

{quote}
2. Is it a goal to support prefetch of map data at reducer side?
{quote}
Again this might be done by some implementations of ShuffleManager

{quote}
3. for ShuffleReader, why only partition range is allowed? How about extend 
this API to support multiple indididual partitions? For example, if reducer 
knows that partitions 1,3,5 are ready while 2,4,6 are not, reducer can fetch 
1,3,5 at first. Instead of making 3 calls of getReader, making one call can 
reduce mapper side disk seek operations, e.g. if partitions 3,5 are on 
continous on one node.
{quote}
The reducer code shouldn't have to worry about what order to fetch things in. 
Instead, when you request a range, the ShuffleManager implementation can decide 
which partitions to fetch first based on what's available. The idea was that 
some code in DAGScheduler decides on the number of reduce tasks and their 
partition ranges (by looking at the map output size for each partition) and 
then the ShuffleManager on each node fetches the right partitions. Ranges are 
simpler to deal with than arbitrary sets and more space-efficient to represent 
(e.g. imagine we had 100,000 map tasks).

{quote}
4. I am not sure whether such a partition list or range shall return one reader 
instance or mulitple ones.
{quote}
It returns one reader that gathers and combines key-value pairs across all the 
partitions.


> Pluggable interface for shuffles
> --------------------------------
>
>                 Key: SPARK-2044
>                 URL: https://issues.apache.org/jira/browse/SPARK-2044
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>            Reporter: Matei Zaharia
>            Assignee: Matei Zaharia
>         Attachments: Pluggableshuffleproposal.pdf
>
>
> Given that a lot of the current activity in Spark Core is in shuffles, I 
> wanted to propose factoring out shuffle implementations in a way that will 
> make experimentation easier. Ideally we will converge on one implementation, 
> but for a while, this could also be used to have several implementations 
> coexist. I'm suggesting this because I aware of at least three efforts to 
> look at shuffle (from Yahoo!, Intel and Databricks). Some of the things 
> people are investigating are:
> * Push-based shuffle where data moves directly from mappers to reducers
> * Sorting-based instead of hash-based shuffle, to create fewer files (helps a 
> lot with file handles and memory usage on large shuffles)
> * External spilling within a key
> * Changing the level of parallelism or even algorithm for downstream stages 
> at runtime based on statistics of the map output (this is a thing we had 
> prototyped in the Shark research project but never merged in core)
> I've attached a design doc with a proposed interface. It's not too crazy 
> because the interface between shuffles and the rest of the code is already 
> pretty narrow (just some iterators for reading data and a writer interface 
> for writing it). Bigger changes will be needed in the interaction with 
> DAGScheduler and BlockManager for some of the ideas above, but we can handle 
> those separately, and this interface will allow us to experiment with some 
> short-term stuff sooner.
> If things go well I'd also like to send a sort-based shuffle implementation 
> for 1.1, but we'll see how the timing on that works out.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to