Hi Benjamin,

Thanks for letting us know. That means my original assumption was wrong.
The size of bags is not small. In fact, you can compute the avg size of
bags as follows: total number of input records / ( reduce input groups x
number of reducers ).

One more thing you can try is turning on "pig.exec.mapPartAgg". That may
help mappers run faster. If this doesn't work, I run out of ideas. :-)

Thanks,
Cheolsoo



On Sat, Aug 24, 2013 at 3:27 AM, Benjamin Jakobus <[email protected]>wrote:

> Hi Alan, Cheolsoo,
>
> I re-ran the benchmarks with and without the combiner. Enabling the
> combiner is faster:
>
> With combiner:
> real 668.44
> real 663.10
> real 665.05
>
> Without combiner:
> real 795.97
> real 810.51
> real 810.16
>
> Best Regards,
> Ben
>
>
> On 22 August 2013 16:33, Cheolsoo Park <[email protected]> wrote:
>
> > Hi Benjamin,
> >
> > To answer your question, how the Hadoop combiner works is that 1) mappers
> > write outputs to disk and 2) combiners read them, combine and write them
> > again. So you're paying extra disk I/O as well as
> > serialization/deserialization.
> >
> > This will pay off if combiners significantly reduce the intermediate
> > outputs that reducers need to fetch from mappers. But if reduction is not
> > significant, it will only slow down mappers. You can identify whether
> this
> > is really a problem by comparing the time spent by map and combine
> > functions in the task logs.
> >
> > What I usually do are:
> > 1) If there are many small bags, disable combiners.
> > 2) If there are many large bags, enable combiners. Furthermore, turning
> on
> > "pig.exec.mapPartAgg" helps. (see the Pig
> > blog<https://blogs.apache.org/pig/entry/apache_pig_it_goes_to>for
> > details.
> > )
> >
> > Thanks,
> > Cheolsoo
> >
> >
> > On Thu, Aug 22, 2013 at 4:01 AM, Benjamin Jakobus <
> [email protected]
> > >wrote:
> >
> > > Hi Cheolsoo,
> > >
> > > Thanks - I will try this now and get back to you.
> > >
> > > Out of interest; could you explain (or point me towards resources that
> > > would) why the combiner would be a problem?
> > >
> > > Also, could the fact that Pig builds an intermediary data structure (?)
> > > whilst Hive just performs a sort then the arithmetic operation explain
> > the
> > > slowdown?
> > >
> > > (Apologies, I'm quite new to Pig/Hive - just my guesses).
> > >
> > > Regards,
> > > Benjamin
> > >
> > >
> > > On 22 August 2013 01:07, Cheolsoo Park <[email protected]> wrote:
> > >
> > > > Hi Benjamin,
> > > >
> > > > Thank you very much for sharing detailed information!
> > > >
> > > > 1) From the runtime numbers that you provided, the mappers are very
> > slow.
> > > >
> > > > CPU time spent (ms)5,081,610168,7405,250,350CPU time spent
> > (ms)5,052,700
> > > > 178,2205,230,920CPU time spent (ms)5,084,430193,4805,277,910
> > > >
> > > > 2) In your GROUP BY query, you have an algebraic UDF "COUNT".
> > > >
> > > > I am wondering whether disabling combiner will help here. I have
> seen a
> > > lot
> > > > of cases where combiner actually hurt performance significantly if it
> > > > doesn't combine mapper outputs significantly. Briefly looking at
> > > > generate_data.pl in PIG-200, it looks like a lot of random keys are
> > > > generated. So I guess you will end up with a large number of small
> bags
> > > > rather than a small number of large bags. If that's the case,
> combiner
> > > will
> > > > only add overhead to mappers.
> > > >
> > > > Can you try to include this "set pig.exec.nocombiner true;" and see
> > > whether
> > > > it helps?
> > > >
> > > > Thanks,
> > > > Cheolsoo
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Aug 21, 2013 at 3:52 AM, Benjamin Jakobus <
> > > [email protected]
> > > > >wrote:
> > > >
> > > > > Hi Cheolsoo,
> > > > >
> > > > > >>What's your query like? Can you share it? Do you call any
> algebraic
> > > UDF
> > > > > >> after group by? I am wondering whether combiner matters in your
> > > test.
> > > > > I have been running 3 different types of queries.
> > > > >
> > > > > The first was performed on datasets of 6 different sizes:
> > > > >
> > > > >
> > > > >    - Dataset size 1: 30,000 records (772KB)
> > > > >    - Dataset size 2: 300,000 records (6.4MB)
> > > > >    - Dataset size 3: 3,000,000 records (63MB)
> > > > >    - Dataset size 4: 30 million records (628MB)
> > > > >    - Dataset size 5: 300 million records (6.2GB)
> > > > >    - Dataset size 6: 3 billion records (62GB)
> > > > >
> > > > > The datasets scale linearly, whereby the size equates to 3000 *
> 10n .
> > > > > A seventh dataset consisting of 1,000 records (23KB) was produced
> to
> > > > > perform join
> > > > > operations on. Its schema is as follows:
> > > > > name - string
> > > > > marks - integer
> > > > > gpa - float
> > > > > The data was generated using the generate data.pl perl script
> > > available
> > > > > for
> > > > > download
> > > > >  from https://issues.apache.org/jira/browse/PIG-200 to produce the
> > > > > datasets. The results are as follows:
> > > > >
> > > > >
> > > > >  *      * *      * *      * *Set 1      * *Set 2**      * *Set 3**
> > >  *
> > > > > *Set
> > > > > 4**      * *Set 5**      * *Set 6*
> > > > > *Arithmetic**      * 32.82*      * 36.21*      * 49.49*      *
> 83.25*
> > > > >  *
> > > > >  423.63*      * 3900.78
> > > > > *Filter 10%**      * 32.94*      * 34.32*      * 44.56*      *
> 66.68*
> > > > >  *
> > > > >  295.59*      * 2640.52
> > > > > *Filter 90%**      * 33.93*      * 32.55*      * 37.86*      *
> 53.22*
> > > > >  *
> > > > >  197.36*      * 1657.37
> > > > > *Group**      * *      *49.43*      * 53.34*      * 69.84*      *
> > > 105.12*
> > > > >    *497.61*      * 4394.21
> > > > > *Join**      * *      *   49.89*      * 50.08*      * 78.55*      *
> > > > 150.39*
> > > > >    *1045.34*     *10258.19
> > > > > *Averaged performance of arithmetic, join, group, order, distinct
> > > select
> > > > > and filter operations on six datasets using Pig. Scripts were
> > > configured
> > > > as
> > > > > to use 8 reduce and 11 map tasks.*
> > > > >
> > > > >
> > > > >
> > > > >  *      * *              Set 1**      * *Set 2**      * *Set 3**
> >  *
> > > > > *Set
> > > > > 4**      * *Set 5**      * *Set 6*
> > > > > *Arithmetic**      *  32.84*      * 37.33*      * 72.55*      *
> > 300.08
> > > > >  2633.72    27821.19
> > > > > *Filter 10%      *   32.36*      * 53.28*      * 59.22*      *
> 209.5*
> > > >  *
> > > > > 1672.3*     *18222.19
> > > > > *Filter 90%      *  31.23*      * 32.68*      *  36.8*      *
>  69.55*
> > > > >  *
> > > > > 331.88*     *3320.59
> > > > > *Group      * *      * 48.27*      * 47.68*      * 46.87*      *
> > 53.66*
> > > > >  *141.36*     *1233.4
> > > > > *Join      * *      * *   *48.54*      *56.86*      * 104.6*      *
> > > > 517.5*
> > > > >    * 4388.34*      * -
> > > > > *Distinct**      * *     *48.73*      *53.28*      * 72.54*      *
> > > > 109.77*
> > > > >    * - *      * *      *  -
> > > > > *Averaged performance of arithmetic, join, group, distinct select
> and
> > > > > filter operations on six datasets using Hive. Scripts were
> configured
> > > as
> > > > to
> > > > > use 8 reduce and 11 map tasks.*
> > > > >
> > > > > (If you want to see the standard deviation, let me know).
> > > > >
> > > > > So, to summarize the results: Pig outperforms Hive, with the
> > exception
> > > of
> > > > > using *Group By*.
> > > > >
> > > > > The Pig scripts used for this benchmark are as follows:
> > > > > *Arithmetic*
> > > > > -- Generate with basic arithmetic
> > > > > A = load '$input/dataset_300000000' using PigStorage('\t') as
> (name,
> > > age,
> > > > > gpa) PARALLEL $reducers;
> > > > > B = foreach A generate age * gpa + 3, age/gpa - 1.5 PARALLEL
> > $reducers;
> > > > > store B into '$output/dataset_300000000_projection' using
> > PigStorage()
> > > > > PARALLEL $reducers;
> > > > >
> > > > > *
> > > > > *
> > > > > *Filter 10%*
> > > > > -- Filter that removes 10% of data
> > > > > A = load '$input/dataset_300000000' using PigStorage('\t') as
> (name,
> > > age,
> > > > > gpa) PARALLEL $reducers;
> > > > > B = filter A by gpa < '3.6' PARALLEL $reducers;
> > > > > store B into '$output/dataset_300000000_filter_10' using
> PigStorage()
> > > > > PARALLEL $reducers;
> > > > >
> > > > >
> > > > > *Filter 90%*
> > > > > -- Filter that removes 90% of data
> > > > > A = load '$input/dataset_300000000' using PigStorage('\t') as
> (name,
> > > age,
> > > > > gpa) PARALLEL $reducers;
> > > > > B = filter A by age < '25' PARALLEL $reducers;
> > > > > store B into '$output/dataset_300000000_filter_90' using
> PigStorage()
> > > > > PARALLEL $reducers;
> > > > >
> > > > > *
> > > > > *
> > > > > *Group*
> > > > > A = load '$input/dataset_300000000' using PigStorage('\t') as
> (name,
> > > age,
> > > > > gpa) PARALLEL $reducers;
> > > > > B = group A by name PARALLEL $reducers;
> > > > > C = foreach B generate flatten(group), COUNT(A.age) PARALLEL
> > $reducers;
> > > > > store C into '$output/dataset_300000000_group' using PigStorage()
> > > > PARALLEL
> > > > > $reducers;
> > > > > *
> > > > > *
> > > > > *Join*
> > > > > A = load '$input/dataset_300000000' using PigStorage('\t') as
> (name,
> > > age,
> > > > > gpa) PARALLEL $reducers;
> > > > > B = load '$input/dataset_join' using PigStorage('\t') as (name,
> age,
> > > gpa)
> > > > > PARALLEL $reducers;
> > > > > C = cogroup A by name inner, B by name inner PARALLEL $reducers;
> > > > > D = foreach C generate flatten(A), flatten(B) PARALLEL $reducers;
> > > > > store D into '$output/dataset_300000000_cogroup_big' using
> > PigStorage()
> > > > > PARALLEL $reducers;
> > > > >
> > > > > Similarly, here the Hive scripts:
> > > > > *Arithmetic*
> > > > > SELECT (dataset.age * dataset.gpa + 3) AS F1,
> > (dataset.age/dataset.gpa
> > > -
> > > > > 1.5) AS F2
> > > > > FROM dataset
> > > > > WHERE dataset.gpa > 0;
> > > > >
> > > > > *Filter 10%*
> > > > > SELECT *
> > > > > FROM dataset
> > > > > WHERE dataset.gpa < 3.6;
> > > > >
> > > > > *Filter 90%*
> > > > > SELECT *
> > > > > FROM dataset
> > > > > WHERE dataset.age < 25;
> > > > >
> > > > > *Group*
> > > > > SELECT COUNT(dataset.age)
> > > > > FROM dataset
> > > > > GROUP BY dataset.name;
> > > > >
> > > > > *Join*
> > > > > SELECT *
> > > > > FROM dataset JOIN dataset_join
> > > > > ON dataset.name = dataset_join.name;
> > > > >
> > > > > I will re-run the benchmarks to see whether it is the reduce or map
> > > side
> > > > > that is slower and get back to you later today.
> > > > >
> > > > > The other two benchmarks were slightly different: I performed
> > > transitive
> > > > > self joins in which Pig outperformed Hive. However once I added a
> > Group
> > > > By,
> > > > > Hive began outperforming Pig.
> > > > >
> > > > > I also ran the TPC-H benchmarks and noticed that Hive
> (surprisingly)
> > > > > outperformed Pig. However what *seems* to cause the actual
> > performance
> > > > > difference is the heavy usage of the Group By operator in all but 3
> > > TPC-H
> > > > > test scripts.
> > > > >
> > > > > Re-running the scripts whilst omitting the the grouping of data
> > > produces
> > > > > the expected results. For example, running script 3
> > > > > (q3_shipping_priority.pig) whilst omitting the Group By operator
> > > > > significantly reduces the runtime (to 1278.49 seconds real time
> > runtime
> > > > or
> > > > > a total of 12,257,630ms CPU time).
> > > > >
> > > > > The fact that the Group By operator skews the TPC-H benchmark in
> > favour
> > > > of
> > > > > Apache Hive is supported by further experiments: as noted earlier a
> > > > > benchmark was carried out on a transitive self-join. The former
> took
> > > Pig
> > > > an
> > > > > average of 45.36 seconds (real time runtime) to execute; it took
> Hive
> > > > 56.73
> > > > > seconds. The latter took  Pig 157.97 and Hive 180.19 seconds
> (again,
> > on
> > > > > average). However adding the Group By operator to the scripts
> turned
> > > the
> > > > > tides: Pig is now significantly slower than Hive, requiring an
> > average
> > > of
> > > > > 278.15 seconds. Hive on the other hand required only 204.01 to
> > perform
> > > > the
> > > > > JOIN and GROUP operations.
> > > > >
> > > > > Real time runtime is measured using the time -p command.
> > > > >
> > > > > Best Regards,
> > > > > Benjamin
> > > > >
> > > > >
> > > > >
> > > > > On 20 August 2013 19:56, Cheolsoo Park <[email protected]>
> wrote:
> > > > >
> > > > > > Hi Benjarmin,
> > > > > >
> > > > > > Can you describe which step of group by is slow? Mapper side or
> > > reducer
> > > > > > side?
> > > > > >
> > > > > > What's your query like? Can you share it? Do you call any
> algebraic
> > > UDF
> > > > > > after group by? I am wondering whether combiner matters in your
> > test.
> > > > > >
> > > > > > Thanks,
> > > > > > Cheolsoo
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 20, 2013 at 2:27 AM, Benjamin Jakobus <
> > > > > [email protected]
> > > > > > >wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > After benchmarking Hive and Pig, I found that the Group By
> > operator
> > > > in
> > > > > > Pig
> > > > > > > is drastically slower that Hive's. I was wondering whether
> > anybody
> > > > has
> > > > > > > experienced the same? And whether people may have any tips for
> > > > > improving
> > > > > > > the performance of this operation? (Adding a DISTINCT as
> > suggested
> > > by
> > > > > an
> > > > > > > earlier post on here doesn't help. I am currently re-running
> the
> > > > > > benchmark
> > > > > > > with LZO compression enabled).
> > > > > > >
> > > > > > > Regards,
> > > > > > > Ben
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to