Thanks Eric,

I think I may have found the cause of the problem, but have no idea how to
do fix it.

My mapper is STDOUT.puts "key1 tab key2 tab text" -- and the job tracker
shows the total number of records being emitted as
say 35 million.

it then goes thru -combiner /bin/cat (ie a NOOP, in theory)

The job tracker however shows 70 million output records.

So, it seems to me like that something isnt quite working correctly here,
perhaps like a Double NewLine being inserted? Something else?  I have not a
clue. Do you know the syntax for not having ANY combiner, or where I could
find such documentation?

Cheers,
 Winton




On Wed, Feb 10, 2010 at 4:02 PM, E. Sammer <[email protected]> wrote:

> Winton:
>
> I don't know the exact streaming options you're looking for, but what you
> have looks correct. Generally, to do what you want all you should have to do
> is 1. sort on both field zero and one in the key and 2. partition on only
> zero. This ensures all keys containing 'AA' go to the same reducer
> regardless of the zero or one. Once the reducer code is invoked, you're
> guaranteed to see records in the order they were sorted (which, if #1 goes
> right, is what you're looking for).
>
> Sorry I can't help much with the streaming options, but hopefully this
> clears up any questions you have around the sort / partition / reducer
> record order semantics.
>
>
> On 2/10/10 6:13 PM, Winton Davies wrote:
>
>> I'm using streaming hadoop, installed vua cloudera on ec2.
>>
>> My job should be straightforward:
>>
>> 1) Map task, emits 2 keys and 1 VALUE
>>
>>    <WORD><FLAG, 0 or 1><TEXT>
>>
>> eg
>>
>> AA  0 QUICK BROWN FOX
>> AA  1 QUICK BROWN FOX
>> BB  1 QUICK RED DOG
>>
>>
>> 2) Reduce Task, assuming<WORD>  are all in its standard input and flag,
>> runs
>> thru the stdin. When the 1st  key changes it checks to see if flag is 0 or
>> 1, if it is 0, it emits all records of that key. If it changes and is a 1
>> it
>> skips all records of that key.
>>
>>
>> My run script is here:
>>
>> hadoop jar
>> /usr/lib/hadoop-0.20/contrib/streaming/hadoop-0.20.1+152-streaming.jar \
>>         -D stream.num.map.output.key.fields=2 \
>>         -D mapred.text.key.partitioner.options="-k1,1"\
>>         -D
>>
>> mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
>> \
>>         -D mapred.text.key.comparator.options="-k1,1 -k2,2"\
>>         -file $files \
>>         -input input \
>>         -output output \
>>         -mapper mapper.rb \
>>         -reducer reducer.rb \
>>         -combiner /bin/cat \
>>         -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
>> hadoop dfs -get output .
>>
>> No matter what I do, I do not get the desired effect of partition on Key,
>> and the reduce input sorted by KEY0 and then by KEY1 -- it appears to wokr
>> just fine on a single node test case, but as soon as I run it on a 32 node
>> hadoop cluster, it breaks. I don't really have any sense on what is going
>> on, other than perhaps I do not understand the subtleties between
>> partitioning and ordering the input to the reduce task. It's possible also
>> that I misunderstand how the reducer is fed its data, but again, my test
>> example doesn't exhibit the problem.
>>
>> The reducer code is here:
>> #!/usr/bin/env ruby
>> #
>> #
>> lastkey=nil
>> noskip=true
>> STDIN.each_line do |line|
>>   keyval = line.strip.split("\t")
>>   # new key!
>>   # if the second value is 0 after a keychange then we are going to
>> output.
>>   if lastkey != keyval[0] then
>>     noskip = ( keyval[1] == "0" )
>>     lastkey = keyval[0]
>>   end
>>   puts line.strip if noskip
>> end
>>
>>
>>
>> Thanks so much for any comments,
>>  Winton
>>
>>
>
> --
> Eric Sammer
> [email protected]
> http://esammer.blogspot.com
>

Reply via email to