Hi Josh,

FYI it worked like a charm. Thanks for your help.


Piotr



________________________________
 From: Josh Wills <[email protected]>
To: [email protected]; Peter Knap <[email protected]> 
Sent: Wednesday, December 12, 2012 12:30 AM
Subject: Re: Combiner question
 

Please do, I'll be curious to know if it works.

J



On Tue, Dec 11, 2012 at 10:28 PM, Peter Knap <[email protected]> wrote:

You are right, it might work - I didn't think about using maps. I'm curious 
what would be the overhead of using them though. I'll try it out tomorrow and 
let you know.
>
>Thanks a lot,
>Piotr
>
>
>
>
>
>
>________________________________
> From: Josh Wills <[email protected]>
>
>To: [email protected]; Peter Knap <[email protected]> 
>Sent: Wednesday, December 12, 2012 12:15 AM
>Subject: Re: Combiner question
> 
>
>
>If your secondary key is a string (or if you wouldn't mind treating it as a 
>string), then a combiner strategy can still work for you. Something like:
>
>
>PTable<K, Map<String, Pair<Integer, Collection<Float>>>> pt = ...
>
>
>w/a PType of tableOf(strings(), maps(pairs(ints(), collections(floats())))), 
>and I would strongly recommend using import static o.a.c.types.avro.Avros.* in 
>order to make that compact to express and fast to run. Then your combiner 
>could do the aggregations on the Map<String, Pair<Integer, Collection<Float>>> 
>entries to compute the averages for each secondary key (reducing the IO) while 
>still passing all of the values for the same primary key to the same reducer. 
>That was a pattern that Sawzall supported that I always really liked and would 
>like to have in Crunch as well. What do you think?
>
>
>J
>
>
>
>On Tue, Dec 11, 2012 at 10:04 PM, Peter Knap <[email protected]> wrote:
>
>Hi Josh,
>>
>>Thanks for the quick reply. Here is my problem:
>>
>>My mappers will produce a lot of records with the same key which I will 
>>aggregate in the reducers. To cut down on the i/o I wanted to apply some 
>>aggregation on the map side. At the same time on the reducer side I want to 
>>aggregate across mappers output and produce final aggregation & format 
>>transformation. For example my mapper output will be:
>>
>>Key: <main key>           Value: <secondary key> <val1> ... <val N>
>>
>>I can aggregate (average) data for records with the same <main key> 
>><secondary key> by having combiner produce:
>>
>>
>>Key: <main key>           Value: <secondary key> <avg(val1)> ... <avg(val N)>
>>
>>
>>This reduces a number of i/o a lot.
>>
>>
>>
>>Now my reducer will use just <main key> to produce final output :
>>
>>
>><main key>                  <secondary key> <avg(val1)> ... <avg(val N)> | 
>><secondary key> <avg(val1)> ... <avg(val N)> | .........
>>
>>
>>
>>I was hoping to have just one M/R job to do it. But all I could come up was:
>>
>>
>>PTable<K, V> myTable = ...;
>>myTable.groupByKey()
>>    .combineValues(CombineFn/Aggregator to do the combine step)
>>    .groupByKey()
>>    .parallelDo(DoFn to aggregate & transform result of CombineFn to another 
>>format for output)
>>
>>But that's 2 M/R jobs.
>>
>>
>>
>>Thanks,
>>Piotr
>>
>>
>>
>>
>>________________________________
>> From: Josh Wills <[email protected]>
>>To: [email protected]; Peter Knap <[email protected]> 
>>Sent: Tuesday, December 11, 2012 11:44 PM
>>Subject: Re: Combiner question
>> 
>>
>>
>>Hey Peter,
>>
>>
>>We might need some more details on what you're trying to do. You're allowed 
>>to add additional parallelDo operations after the combineValues operation, 
>>e.g.,
>>
>>
>>PTable<K, V> myTable = ...;
>>myTable.groupByKey()
>>    .combineValues(CombineFn/Aggregator to do the combine step)
>>    .parallelDo(DoFn to transform result of CombineFn to another format for 
>>output)
>>
>>
>>is perfectly valid.
>>
>>
>>J
>>
>>
>>
>>On Tue, Dec 11, 2012 at 9:41 PM, Peter Knap <[email protected]> wrote:
>>
>>Hi guys,
>>>
>>>
>>>I started a small POC with crunch as a replacement for the current python 
>>>implementation and I ran into a problem with using combiners. How would one 
>>>specify a combiner which is different from the reducer? I know that's not a 
>>>typical case but I want to have partial optimization on the map side and at 
>>>the same time the output format from reducer is different than from the 
>>>combiner so I need two distinct classes. From looking at the code I can't 
>>>figure it out how to do it. Any help would be greatly appreciated.
>>>
>>>
>>>
>>>Thanks,
>>>Piotr
>>>
>>
>>
>>
>
>
>
>-- 
>
>Director of Data Science
>Cloudera
>Twitter: @josh_wills
>
>
>


-- 

Director of Data Science
Cloudera
Twitter: @josh_wills

Reply via email to