Winton:

I don't know the exact streaming options you're looking for, but what you have looks correct. Generally, to do what you want all you should have to do is 1. sort on both field zero and one in the key and 2. partition on only zero. This ensures all keys containing 'AA' go to the same reducer regardless of the zero or one. Once the reducer code is invoked, you're guaranteed to see records in the order they were sorted (which, if #1 goes right, is what you're looking for).

Sorry I can't help much with the streaming options, but hopefully this clears up any questions you have around the sort / partition / reducer record order semantics.

On 2/10/10 6:13 PM, Winton Davies wrote:
I'm using streaming hadoop, installed vua cloudera on ec2.

My job should be straightforward:

1) Map task, emits 2 keys and 1 VALUE

    <WORD><FLAG, 0 or 1><TEXT>

eg

AA  0 QUICK BROWN FOX
AA  1 QUICK BROWN FOX
BB  1 QUICK RED DOG


2) Reduce Task, assuming<WORD>  are all in its standard input and flag, runs
thru the stdin. When the 1st  key changes it checks to see if flag is 0 or
1, if it is 0, it emits all records of that key. If it changes and is a 1 it
skips all records of that key.


My run script is here:

hadoop jar
/usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
         -D stream.num.map.output.key.fields=2 \
         -D mapred.text.key.partitioner.options="-k1,1"\
         -D
mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
\
         -D mapred.text.key.comparator.options="-k1,1 -k2,2"\
         -file $files \
         -input input \
         -output output \
         -mapper mapper.rb \
         -reducer reducer.rb \
         -combiner /bin/cat \
         -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
hadoop dfs -get output .

No matter what I do, I do not get the desired effect of partition on Key,
and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
just fine on a single node test case, but as soon as I run it on a 32 node
hadoop cluster, it breaks. I don't really have any sense on what is going
on, other than perhaps I do not understand the subtleties between
partitioning and ordering the input to the reduce task. It's possible also
that I misunderstand how the reducer is fed its data, but again, my test
example doesn't exhibit the problem.

The reducer code is here:
#!/usr/bin/env ruby
#
#
lastkey=nil
noskip=true
STDIN.each_line do |line|
   keyval = line.strip.split("\t")
   # new key!
   # if the second value is 0 after a keychange then we are going to output.
   if lastkey != keyval[0] then
     noskip = ( keyval[1] == "0" )
     lastkey = keyval[0]
   end
   puts line.strip if noskip
end



Thanks so much for any comments,
  Winton



--
Eric Sammer
[email protected]
http://esammer.blogspot.com

Reply via email to