> On 2011-12-12 02:10:01, Dmitriy Lyubimov wrote: > > Hm. I hope i did not read the code or miss something. > > > > 1 -- i am not sure this will actually work as intended unless # of reducers > > is corced to 1, of which i see no mention in the code. > > 2 -- mappers do nothing, passing on all the row pressure to sort which is > > absolutely not necessary. Even if you use combiners. This is going to be > > especially the case if you coerce 1 reducer an no combiners. IMO mean > > computation should be pushed up to mappers to avoid sort pressures of map > > reduce. Then reduction becomes largely symbolical(but you do need pass on > > the # of rows mapper has seen, to the reducer, in order for that operation > > to apply correctly). > > 3 -- i am not sure -- is NullWritable as a key legit? In my experience > > sequence file reader cannot instantiate it because NullWritable is a > > singleton and its creation is prohibited by making constructor private. > > Raphael Cendrillon wrote: > Thanks Dmitry. > > Regarding 1, if I understand correctly the number of reducers depends on > the number of unique keys. Since all keys are set to the same value (null), > then all of the mapper outputs should arrive at the same reducer. This seems > to work in the unit test, but I may be missing something? > > Regarding 2, that makes alot of sense. I'm wondering how many rows should > be processed per mapper? I guess there is a trade-off between scalability > (processing more rows within a single map job means that each row must have > less columns) and speed? Is there someplace in the SSVD code where the > matrix is split into slices of rows that I could use as a reference? > > Regarding 3, I believe NullWritable is OK. It's used pretty extensively > in TimesSquaredJob in DistributedRowMatrx. However if you feel there is some > disadvantage to this I could replace "NullWritable.get()" with "new > IntWritable(1)" (that is, set all of the keys to 1). Would that be more > suitable? > > > > Dmitriy Lyubimov wrote: > NullWritable objection is withdrawn. Apparently i haven't looked into > hadoop for too long, amazingly it seems to work now. > > > 1 -- I don't think your statement about # of reduce tasks is true. > > The job (or, rather, user) sets the number of reduce tasks via config > propery. All users will follow hadoop recommendation to set that to 95% of > capacity they want to take. (usually the whole cluster). So in production > environment you are virtually _guaranteed_ to have number of reducers of > something like 75 on a 40-noder and consequently 75 output files (unless > users really want to read the details of your job and figure you meant it to > be just 1). > Now, it is true that only one file will actually end up having something > and the rest of task slots will just be occupied doing nothing . > > So there are two problems with that scheme: a) is that job that allocates > so many task slots that do nothing is not a good citizen, since in real > production cluster is always shared with multiple jobs. b) your code assumes > result will end up in partition 0, whereas contractually it may end up in any > of 75 files. (in reality with default hash partitioner for key 1 it will wind > up in partion 0001 unless there's one reducer as i guess in your test was). > > 2-- it is simple. when you send n rows to reducers, they are shuffled - > and - sorted. Sending massive sets to reducers has 2 effects: first, even if > they all group under the same key, they are still sorted with ~ n log (n/p) > where p is number of partitions assuming uniform distribution (which it is > not because you are sending everything to the same place). Just because we > can run distributed sort, doesn't mean we should. Secondly, all these rows > are physically moved to reduce tasks, which is still ~n rows. Finally what > has made your case especially problematic is that you are sending everything > to the same reducer, i.e. you are not actually doing sort in distributed way > but rather simple single threaded sort at the reducer that happens to get all > the input. > > So that would allocate a lot of tasks slots that are not used; but do a > sort that is not needed; and do it in a single reducer thread for the entire > input which is not parallel at all. > > Instead, consider this: map has a state consisting of (sum(X), k). it > keeps updating it sum+=x, k++ for every new x. At the end of the cycle (in > cleanup) it writes only 1 tuple (sum(x), k) as output. so we just reduced > complexity of the sort and io from millions of elements to just # of maps > (which is perhaps just handful and in reality rarely overshoots 500 mappers). > That is, about at least 4 orders of magnitude. > > Now, we send that handful tuples to single reducer and just do combining > (sum(X)+= sum_i(X); n+= n_i) where i is the tuple in reducer. And because it > is only a handful, reducer also runs very quickly, so the fact that we > coerced it to be 1, is pretty benign. That volume of anywhere between 1 to > 500 vectors it sums up doesn't warrant distributed computation. > > But, you have to make sure there's only 1 reducer no matter what user put > into the config, and you have to make sure you do all heavy lifting in the > mappers. > > Finally, you don't even to coerce to 1 reducer. You still can have > several (but uniformly distributed) and do final combine in front end of the > method. However, given small size and triviality of the reduction processing, > it is probably not warranted. Coercing to 1 reducer is ok in this case IMO. > > 3 i guess any writable is ok but NullWritable. Maybe something has > changed. i remember falling into that pitfall several generations of hadoop > ago. You can verify by staging a simple experiment of writing a sequence file > with nullwritable as either key or value and try to read it back. in my test > long ago it would write ok but not read back. I beleive similar approach is > used with keys in shuffle and sort. There is a reflection writable factory > inside which is trying to use default constructor of the class to bring it up > which is(was) not available for NullWritable. > > > > > Raphael Cendrillon wrote: > Thanks Dmitriy. I've updated the diff to push the row summation into the > mapper as you suggested, force the number of reducers to 1, and make the > final output key IntWritable. Could you please take a look? > > Dmitriy Lyubimov wrote: > looks good on top of it. > One nitpicking that i have is this > > context.write(NullWritable.get(), new VectorWritable(runningSum)); > > but runningSum is initialized in the map loop which technically may never > be called (not likely but theoretically possible). > > so therefore i'd initialize runningSum vector to something that is > nonzero in setup. or better yet just check for null and skip map output if no > data. > > Same considerations for reducer. it needs to handle the corner case when > there's no input, correctly. >
Thanks Dmitry. I've updated the patch to add these checks. - Raphael ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/3147/#review3838 ----------------------------------------------------------- On 2011-12-13 00:10:57, Raphael Cendrillon wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/3147/ > ----------------------------------------------------------- > > (Updated 2011-12-13 00:10:57) > > > Review request for mahout and Dmitriy Lyubimov. > > > Summary > ------- > > Here's a patch with a simple job to calculate the row mean (column-wise > mean). One outstanding issue is the combiner, this requires a wrtiable class > IntVectorTupleWritable, where the Int stores the number of rows, and the > Vector stores the column-wise sum. > > > This addresses bug MAHOUT-923. > https://issues.apache.org/jira/browse/MAHOUT-923 > > > Diffs > ----- > > > /trunk/core/src/main/java/org/apache/mahout/math/hadoop/DistributedRowMatrix.java > 1213474 > > /trunk/core/src/main/java/org/apache/mahout/math/hadoop/MatrixRowMeanJob.java > PRE-CREATION > > /trunk/core/src/test/java/org/apache/mahout/math/hadoop/TestDistributedRowMatrix.java > 1213474 > > Diff: https://reviews.apache.org/r/3147/diff > > > Testing > ------- > > Junit test > > > Thanks, > > Raphael > >