[ 
https://issues.apache.org/jira/browse/CRUNCH-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13687557#comment-13687557
 ] 

Gabriel Reid commented on CRUNCH-222:
-------------------------------------

I think that this could also be somewhat resolved by some of the functionality 
being discussed in CRUNCH-218, specifically around forcing a given PCollection 
to be checkpointed to disk. The same kind of thing could be done with the state 
of the PCollection as it is after MyDoFn, and I think it would have the same 
functionality as what you're describing here.

I'm a little bit uneasy about shaping the workflows too much based on size 
estimates, mostly because I usually forget to implement DoFn#scaleFactor, and 
I'm assume that I'm not the only person who has that issue. With that in mind, 
I think I'd feel more comfortable with a more explicit statement to specify, 
either directly or indirectly, that a DoFn should be run on the map or reduce 
side.

On the other hand, right now the size of PCollections is only used for deciding 
on the number of reducers, although it could probably be used for much more (as 
illustrated by this issue), so maybe we should embrace it more and really try 
to do optimizations based on data size.




                
> Planner should choose to run DoFns on Map or Reduce side depending on data 
> size
> -------------------------------------------------------------------------------
>
>                 Key: CRUNCH-222
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-222
>             Project: Crunch
>          Issue Type: New Feature
>    Affects Versions: 0.7.0
>            Reporter: Joseph Adler
>
> Hi guys,
> I was using Crunch to run a large data pipeline, and came across a problem. 
> In one stage (between two group functions), I have a DoFn that increases the 
> output data size by a factor of 5. You can picture the flow like this:
>   GroupByKeyA -> MyDoFn -> GroupByKeyB
> On Hadoop, this translates to something like this:
>   -----------------------------         -----------------------------
>   | map1  -> reduce1 |   ->   | map2  -> reduce2 |
>   -----------------------------         -----------------------------
> Logically, you can either run MyDoFn within reduce1 or map2 and get the same 
> results. (The same data will be created as an input to reduce2.)
> In the current implementation, MyDoFn always runs in Reduce1. That means that 
> the first map/reduce job will write out 5x more data than it would if MyDoFn 
> ran in Map2. For my job, this is a big deal: that's 5x more data written to 
> HDFS and read from HDFS; that's a lot of extra work on HDFS and a lot of 
> extra network traffic. Clearly, it would be more efficient to run MyDoFn in 
> map2.
> I'd like to propose that we change the Crunch Planner to take into account 
> the output size of a DoFn (or set of DoFns) when deciding where it should be 
> run. When the scaleFactor() is <= 1 (and reduces the data size), it should 
> run on the reduce side. When the scaleFactor is > 1 (and increases the data 
> size), it should run on the map side. (For a chain of n DoFns, this implies 
> that Crunch will need to inspect up to n+1 different scales when deciding 
> where to run each DoFn.)
> -- Joe

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to