Thanks Eric, I think I may have found the cause of the problem, but have no idea how to do fix it.
My mapper is STDOUT.puts "key1 tab key2 tab text" -- and the job tracker shows the total number of records being emitted as say 35 million. it then goes thru -combiner /bin/cat (ie a NOOP, in theory) The job tracker however shows 70 million output records. So, it seems to me like that something isnt quite working correctly here, perhaps like a Double NewLine being inserted? Something else? I have not a clue. Do you know the syntax for not having ANY combiner, or where I could find such documentation? Cheers, Winton On Wed, Feb 10, 2010 at 4:02 PM, E. Sammer <[email protected]> wrote: > Winton: > > I don't know the exact streaming options you're looking for, but what you > have looks correct. Generally, to do what you want all you should have to do > is 1. sort on both field zero and one in the key and 2. partition on only > zero. This ensures all keys containing 'AA' go to the same reducer > regardless of the zero or one. Once the reducer code is invoked, you're > guaranteed to see records in the order they were sorted (which, if #1 goes > right, is what you're looking for). > > Sorry I can't help much with the streaming options, but hopefully this > clears up any questions you have around the sort / partition / reducer > record order semantics. > > > On 2/10/10 6:13 PM, Winton Davies wrote: > >> I'm using streaming hadoop, installed vua cloudera on ec2. >> >> My job should be straightforward: >> >> 1) Map task, emits 2 keys and 1 VALUE >> >> <WORD><FLAG, 0 or 1><TEXT> >> >> eg >> >> AA 0 QUICK BROWN FOX >> AA 1 QUICK BROWN FOX >> BB 1 QUICK RED DOG >> >> >> 2) Reduce Task, assuming<WORD> are all in its standard input and flag, >> runs >> thru the stdin. When the 1st key changes it checks to see if flag is 0 or >> 1, if it is 0, it emits all records of that key. If it changes and is a 1 >> it >> skips all records of that key. >> >> >> My run script is here: >> >> hadoop jar >> /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \ >> -D stream.num.map.output.key.fields=2 \ >> -D mapred.text.key.partitioner.options="-k1,1"\ >> -D >> >> mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator >> \ >> -D mapred.text.key.comparator.options="-k1,1 -k2,2"\ >> -file $files \ >> -input input \ >> -output output \ >> -mapper mapper.rb \ >> -reducer reducer.rb \ >> -combiner /bin/cat \ >> -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner >> hadoop dfs -get output . >> >> No matter what I do, I do not get the desired effect of partition on Key, >> and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr >> just fine on a single node test case, but as soon as I run it on a 32 node >> hadoop cluster, it breaks. I don't really have any sense on what is going >> on, other than perhaps I do not understand the subtleties between >> partitioning and ordering the input to the reduce task. It's possible also >> that I misunderstand how the reducer is fed its data, but again, my test >> example doesn't exhibit the problem. >> >> The reducer code is here: >> #!/usr/bin/env ruby >> # >> # >> lastkey=nil >> noskip=true >> STDIN.each_line do |line| >> keyval = line.strip.split("\t") >> # new key! >> # if the second value is 0 after a keychange then we are going to >> output. >> if lastkey != keyval[0] then >> noskip = ( keyval[1] == "0" ) >> lastkey = keyval[0] >> end >> puts line.strip if noskip >> end >> >> >> >> Thanks so much for any comments, >> Winton >> >> > > -- > Eric Sammer > [email protected] > http://esammer.blogspot.com >
