> On 2011-12-12 02:10:01, Dmitriy Lyubimov wrote:
> > Hm. I hope i did not read the code or miss something. 
> > 
> > 1 -- i am not sure this will actually work as intended unless # of reducers 
> > is corced to 1, of which i see no mention in the code. 
> > 2 -- mappers do nothing, passing on all the row pressure to sort which is 
> > absolutely not necessary. Even if you use combiners. This is going to be 
> > especially the case if you coerce 1 reducer an no combiners. IMO mean 
> > computation should be pushed up to mappers to avoid sort pressures of map 
> > reduce. Then reduction becomes largely symbolical(but you do need pass on 
> > the # of rows mapper has seen, to the reducer, in order for that operation 
> > to apply correctly).
> > 3 -- i am not sure -- is NullWritable as a key legit? In my experience 
> > sequence file reader cannot instantiate it because NullWritable is a 
> > singleton and its creation is prohibited by making constructor private.
> 
> Raphael Cendrillon wrote:
>     Thanks Dmitry.
>     
>     Regarding 1, if I understand correctly the number of reducers depends on 
> the number of unique keys. Since all keys are set to the same value (null), 
> then all of the mapper outputs should arrive at the same reducer. This seems 
> to work in the unit test, but I may be missing something?
>     
>     Regarding 2, that makes alot of sense. I'm wondering how many rows should 
> be processed per mapper?  I guess there is a trade-off between scalability 
> (processing more rows within a single map job means that each row must have 
> less columns) and speed?  Is there someplace in the SSVD code where the 
> matrix is split into slices of rows that I could use as a reference?
>     
>     Regarding 3, I believe NullWritable is OK. It's used pretty extensively 
> in TimesSquaredJob in DistributedRowMatrx. However if you feel there is some 
> disadvantage to this I could replace "NullWritable.get()" with "new 
> IntWritable(1)" (that is, set all of the keys to 1). Would that be more 
> suitable?
>     
>
> 
> Dmitriy Lyubimov wrote:
>     NullWritable objection is withdrawn. Apparently i haven't looked into 
> hadoop for too long, amazingly it seems to work now.
>     
>     
>     1 -- I don't think your statement about # of reduce tasks is true. 
>     
>     The job (or, rather, user) sets the number of reduce tasks via config 
> propery. All users will follow hadoop recommendation to set that to 95% of 
> capacity they want to take. (usually the whole cluster). So in production 
> environment you are virtually _guaranteed_ to have number of reducers of 
> something like 75 on a 40-noder and consequently 75 output files (unless 
> users really want to read the details of your job and figure you meant it to 
> be just 1). 
>     Now, it is true that only one file will actually end up having something 
> and the rest of task slots will just be occupied doing nothing . 
>     
>     So there are two problems with that scheme: a) is that job that allocates 
> so many task slots that do nothing is not a good citizen, since in real 
> production cluster is always shared with multiple jobs. b) your code assumes 
> result will end up in partition 0, whereas contractually it may end up in any 
> of 75 files. (in reality with default hash partitioner for key 1 it will wind 
> up in partion 0001 unless there's one reducer as i guess in your test was). 
>     
>     2-- it is simple. when you send n rows to reducers, they are shuffled - 
> and - sorted. Sending massive sets to reducers has 2 effects: first, even if 
> they all group under the same key, they are still sorted with ~ n log (n/p) 
> where p is number of partitions assuming uniform distribution (which it is 
> not because you are sending everything to the same place). Just because we 
> can run distributed sort, doesn't mean we should. Secondly, all these rows 
> are physically moved to reduce tasks, which is still ~n rows. Finally what 
> has made your case especially problematic is that you are sending everything 
> to the same reducer, i.e. you are not actually doing sort in distributed way 
> but rather simple single threaded sort at the reducer that happens to get all 
> the input. 
>     
>     So that would allocate a lot of tasks slots that are not used; but do a 
> sort that is not needed; and do it in a single reducer thread for the entire 
> input which is not parallel at all. 
>     
>     Instead, consider this: map has a state consisting of (sum(X), k). it 
> keeps updating it sum+=x, k++ for every new x. At the end of the cycle (in 
> cleanup) it writes only 1 tuple (sum(x), k) as output. so we just reduced 
> complexity of the sort and io from millions of elements to just # of maps 
> (which is perhaps just handful and in reality rarely overshoots 500 mappers). 
> That is, about at least 4 orders of magnitude. 
>     
>     Now, we send that handful tuples to single reducer and just do combining 
> (sum(X)+= sum_i(X); n+= n_i) where i is the tuple in reducer. And because it 
> is only a handful, reducer also runs very quickly, so the fact that we 
> coerced it to be 1, is pretty benign. That volume of anywhere between 1 to 
> 500 vectors it sums up doesn't warrant distributed computation. 
>     
>     But, you have to make sure there's only 1 reducer no matter what user put 
> into the config, and you have to make sure you do all heavy lifting in the 
> mappers.
>     
>     Finally, you don't even to coerce to 1 reducer. You still can have 
> several (but uniformly distributed) and do final combine in front end of the 
> method. However, given small size and triviality of the reduction processing, 
> it is probably not warranted. Coercing to 1 reducer is ok in this case IMO.
>     
>     3 i guess any writable is ok but NullWritable. Maybe something has 
> changed. i remember falling into that pitfall several generations of hadoop 
> ago. You can verify by staging a simple experiment of writing a sequence file 
> with nullwritable as either key or value and try to read it back. in my test 
> long ago it would write ok but not read back. I beleive similar approach is 
> used with keys in shuffle and sort. There is a reflection writable factory 
> inside which is trying to use default constructor of the class to bring it up 
> which is(was) not available for NullWritable.
>     
>     
>
> 
> Raphael Cendrillon wrote:
>     Thanks Dmitriy. I've updated the diff to push the row summation into the 
> mapper as you suggested, force the number of reducers to 1, and make the 
> final output key IntWritable. Could you please take a look?
> 
> Dmitriy Lyubimov wrote:
>     looks good on top of it. 
>     One nitpicking that i have is this 
>     
>     context.write(NullWritable.get(), new VectorWritable(runningSum));
>     
>     but runningSum is initialized in the map loop which technically may never 
> be called (not likely but theoretically possible).
>     
>     so therefore i'd initialize runningSum vector to something that is 
> nonzero in setup. or better yet just check for null and skip map output if no 
> data. 
>     
>     Same considerations for reducer. it needs to handle the corner case when 
> there's no input, correctly.
>

Thanks Dmitry. I've updated the patch to add these checks.


- Raphael


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/3147/#review3838
-----------------------------------------------------------


On 2011-12-13 00:10:57, Raphael Cendrillon wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/3147/
> -----------------------------------------------------------
> 
> (Updated 2011-12-13 00:10:57)
> 
> 
> Review request for mahout and Dmitriy Lyubimov.
> 
> 
> Summary
> -------
> 
> Here's a patch with a simple job to calculate the row mean (column-wise 
> mean). One outstanding issue is the combiner, this requires a wrtiable class 
> IntVectorTupleWritable, where the Int stores the number of rows, and the 
> Vector stores the column-wise sum.
> 
> 
> This addresses bug MAHOUT-923.
>     https://issues.apache.org/jira/browse/MAHOUT-923
> 
> 
> Diffs
> -----
> 
>   
> /trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java
>  1213474 
>   
> /trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixRowMeanJob.java 
> PRE-CREATION 
>   
> /trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java
>  1213474 
> 
> Diff: https://reviews.apache.org/r/3147/diff
> 
> 
> Testing
> -------
> 
> Junit test
> 
> 
> Thanks,
> 
> Raphael
> 
>

Reply via email to