Ning,

Can you explain how it comes up with the estimate at compile time, if the #
of distinct join keys is known only at runtime?  Is this just based on the
number of hdfs blocks of the relations being joined?

Thanks.

On Wed, Nov 11, 2009 at 9:21 PM, Ning Zhang <[email protected]> wrote:

> Your understanding is mostly correct. But the number of reducers are
> estimated at compile time. It is not a strict 1:1 mapping between distinct
> join keys with # of reducers since the stats are not available at compile
> time now.
>
> Your solution is correct in theory. but it is not trivial to partition
> mapper output with the same key to different reducers. It probably need to
> add a "grouping ID" to the sorting key before sending to the reducers.
>
> Ning
>
>
> On Nov 11, 2009, at 12:21 PM, Defenestrator wrote:
>
> Thanks for the explanation, Ning.  This explains the behavior that I'm
> seeing with the original query that I was trying to run which was doing a
> cartesian product for each dt (of which there are only 2) and two reducers
> were running very slowly.
>
> insert overwrite table foo1
>
> select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
>
> from m1 m1 join m1 m2 on m1.dt = m2.dt where m1.aid <> m2.aid and m1.aid <
>> m2.aid group by m1.aid, m2.aid, m1.dt;
>
>
>
> Here's my understanding of the issue, please correct me if I'm incorrect.
>  Because there are only two distinct "dt" values, hive will only allocate a
> single reducer to do the cartesian product per distinct "dt" value.
>
> And this problem doesn't necessarily have anything to do with skewed data,
> right?  Suppose I have a very large dataset that has an even distribution of
> x join key values that all produce a lot of join output tuples, hive will
> have x reducers that will all run very slowly.  And the solution for this
> problem is really for hive to have multiple reducers computing the cartesian
> product for each distinct join key value, correct?
>
> On Wed, Nov 11, 2009 at 11:16 AM, Ning Zhang <[email protected]> wrote:
>
>> I think it depends on how many rows are there in each table and what the
>> distribution of the join keys. I suspect that your data are very skewed so
>> that a lot of rows in table A have the same join key with a lot of rows in
>> table B. That will produce huge number of rows as the join result. Hive
>> currently has at most 1 reducer for each distinct join key. So it may be
>> very slow for this reducer. There are some JIRAs created for this problem. I
>> don't know if there is someone actively working on this, but it should be a
>> great research project.
>>
>> Another way you may be able to optimize is to rewrite your query to use
>> semijoin if you could. Semijoin is just checked in to trunk so you probably
>> need to check out trunk and try it out. If you can rewrite it using semjoin
>> you can avoid cartesian product. The basic idea of semijoin is to implement
>> the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you
>> have a SQL query like:
>>
>> select m1.aid, m1.dt, count(1)
>> from m1 m1
>> where exists (select null from m1 m2 where m1.aid = m2.aid and
>> m1.dt=m2.dt)
>> group by m1.aid, m1.dt;
>>
>> you can rewrite it using left semi join in HiveQL as
>>
>> select m1.aid, m1.dt, count(1)
>> from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt)
>> group by m1.aid, m1.dt;
>>
>> Note that you can not 'select' any columns from m2 in the right-hand-side
>> of the table in left semi join, just as in the above exists subquery you
>> cannot reference inner query tables from the outer query.
>>
>> The benefits of using semijoin rather than inner join is that if there are
>> a lot of rows with the same join key in m1 and m2, it will return after the
>> first match, without doing a cartesian product of all matching rows in m1
>> and m2.
>>
>> Of course whether you can rewrite this depends on your application
>> semantics. If you really want all the combinations of the rows from m1 AND
>> m2, semijoin won't help you.
>>
>> Ning
>>
>> On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote:
>>
>> I have two sequence file tables with 10GB each, and it's exhibiting the
>> same problem of the final reducer just never finishing.
>>
>> Any ideas there? Have you tried joining across such large tables?
>>
>> Ryan
>>
>>
>>
>>
>> On Nov 11, 2009, at 1:37 PM, Ning Zhang <[email protected]> wrote:
>>
>> The problem is in the data loading process: the m1 file is a plain text
>> CSV format and you are loading it to a Hive table with the default setting,
>> which assumes fields are separated by ctl_A. So if you look at the first 10
>> rows, all fields are NULL since Hive cannot find ctl+A in a row. So your
>> query is actually doing a cartesian product of 100k x 100k rows.
>>
>> Since the 'Load data' command doesn't check the input format not does it
>> transform the format, you need to specify the input format in the create
>> table DDL. Following is a working example. It finishes in my unit test
>> (single machine) in less than 3 mins.
>>
>> 8<--------
>> drop table m1;
>> drop table foo1;
>>
>> create table m1 (
>> mid int,
>> aid int,
>> dt string)
>> row format delimited fields terminated by ','
>> stored as textfile;
>>
>> LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;
>>
>> select * from m1 limit 10;
>>
>> create table foo1 (
>> aid_1 int,
>> aid_2 int,
>> mid bigint,
>> dt bigint
>> );
>>
>>
>> insert overwrite table foo1
>> select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
>> from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
>> m1.aid, m2.aid, m1.dt;
>> 8<------------
>>
>> On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:
>>
>> Definitely the join portion of the plan.  The one reduce job takes over 2
>> 1/2 hours to complete doing the following:
>>
>>
>> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 1 rows
>> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 1 rows
>> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 10 rows
>> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 10 rows
>> 2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 100 rows
>> 2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 100 rows
>> 2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 1000 rows
>> 2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 1000 rows
>> 2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 10000 rows
>> 2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 10000 rows
>> 2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 100000 rows
>> 2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 100000 rows
>> 2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 1000000 rows
>> 2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 1000000 rows
>> 2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 2000000 rows
>>
>> ...
>>
>> 2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 9999000000 rows
>> 2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 9999000000 rows
>> 2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 
>> forwarding 10000000000 rows
>> 2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 
>> 5 forwarding 10000000000 rows
>>
>>
>>
>> On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain < <[email protected]>
>> [email protected]> wrote:
>>
>>> I think you missed the attachment.
>>>
>>>
>>> Which job is taking more time – join or group by ?
>>>
>>> Can you send the data characteristics for m1 and foo1 – is it possible
>>> that there is a large skew on aid and dt which is forcing the data to be
>>> send to a single reducer
>>>
>>>
>>>
>>> -namit
>>>
>>>
>>>
>>> On 11/10/09 6:35 PM, "Defenestrator" < <[email protected]>
>>> [email protected]> wrote:
>>>
>>>   I would definitely appreciate any insights on this from the list.  I
>>> tried to reduce the query down to something that is easily understood and
>>> hive still demonstrates a pretty poor join performance behavior on a
>>> three-node hadoop cluster.
>>>
>>> drop table m1;
>>> drop table foo1;
>>>
>>> create table m1 (
>>> mid int,
>>> aid int,
>>> dt string);
>>>
>>> LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;
>>>
>>> create table foo1 (
>>> aid_1 int,
>>> aid_2 int,
>>> mid bigint,
>>> dt bigint
>>> );
>>>
>>> set mapred.reduce.tasks=32;
>>>
>>> insert overwrite table foo1
>>> select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
>>> from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
>>> m1.aid, m2.aid, m1.dt;
>>>
>>>
>>> Attached is the file I'm using that only has 100k rows.  I've looked at
>>> the benchmark 
>>> (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)>
>>> http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)
>>> and hive seems to be able to join much bigger data sets.  And I tried
>>> running the same query on a single node dbms on my desktop, and it's able to
>>> return results in less than 3-minutes.  While hive has been running for at
>>> least 20 minutes now.
>>>
>>> Thanks.
>>>
>>> On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte < <[email protected]>
>>> [email protected]> wrote:
>>>
>>>  Any thoughts on this? I've only had luck by reducing the data on each
>>> side of the join. Is this something Hive might be able to improve in a
>>> future release of the query plan optimization?
>>>
>>> Thanks,
>>> Ryan
>>>
>>>
>>>
>>> On Nov 3, 2009, at 10:55 PM, Ryan LeCompte < <[email protected]>
>>> [email protected]> wrote:
>>>
>>>   I've had a similar issue with a small cluster. Is there any way that
>>> you can reduce the size of the data being joined on both sides? If you
>>> search the forums for join issue, you will see the thread for my issue and
>>> get some tips.
>>>
>>> Thanks,
>>> Ryan
>>>
>>>
>>>
>>> On Nov 3, 2009, at 10:45 PM, Defenestrator < <<[email protected]>
>>> mailto:[email protected] <[email protected]>>
>>> <[email protected]>[email protected]> wrote:
>>>
>>>  I was able to increase the number of reduce jobs manually to 32.
>>>  However, it finishes 28 of them and the other 4 has the same behavior of
>>> using 100% cpu and consuming a lot of memory.  I'm suspecting that it might
>>> be an issue with the reduce job itself - is there a way to figure out what
>>> these jobs are doing exactly?
>>>
>>> Thanks.
>>>
>>> On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < < <[email protected]>
>>> mailto:[email protected] <[email protected]>>  < <[email protected]>
>>> mailto:[email protected] <[email protected]>> <[email protected]>
>>> [email protected]> wrote:
>>>
>>> The number of reducers are inferred from the input data size. But, you
>>> can always overwrite it by setting mapred.reduce.tasks
>>>
>>>
>>>
>>>
>>> *From:* Defenestrator [mailto: <mailto: <[email protected]>
>>> [email protected]>  <mailto: <[email protected]>
>>> [email protected]> <[email protected]>
>>> [email protected]]
>>> *Sent:* Tuesday, November 03, 2009 6:46 PM
>>>
>>> *To:*  < 
>>> <[email protected]>mailto:[email protected]<[email protected]>>
>>>  < 
>>> <[email protected]>mailto:[email protected]<[email protected]>>
>>> <[email protected]>[email protected]
>>>
>>> *Subject:* Re: Self join problem
>>>
>>>
>>> Hi Namit,
>>>
>>>
>>>
>>> Thanks for your suggestion.
>>>
>>>
>>>
>>> I tried changing the query as you had suggested by moving the m1.dt =
>>> m2.dt to the on clause.  It increased the number of reduce jobs to 2.  So
>>> now there are two processes running on two nodes at 100% consuming a lot of
>>> memory.  Is there a reason why hive doesn't spawn more reduce jobs for this
>>> query?
>>>
>>>
>>>
>>> On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < < <[email protected]>
>>> mailto:[email protected] <[email protected]>>  < <[email protected]>
>>> mailto:[email protected] <[email protected]>> <[email protected]>
>>> [email protected]> wrote:
>>>
>>> Get the join condition in the on condition:
>>>
>>>
>>> insert overwrite table foo1
>>> select m1.id < <http://m1.id/>http://m1.id>  as id_1, m2.id <<http://m2.id/>
>>> http://m2.id>  as id_2, count(1), m1.dt
>>> from m1 join m2 on m1.dt=m2.dt where m1.id < <http://m1.id/>http://m1.id>
>>> <> m2.id < <http://m2.id/>http://m2.id>  and m1.id < <http://m1.id/>
>>> http://m1.id>  < m2.id < <http://m2.id/>http://m2.id>  group by m1.id 
>>> <<http://m1.id/>
>>> http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt;
>>>
>>>
>>>
>>>
>>> *From:* Defenestrator [mailto: <mailto: <[email protected]>
>>> [email protected]>  <mailto: <[email protected]>
>>> [email protected]> <[email protected]>
>>> [email protected]]
>>> *Sent:* Tuesday, November 03, 2009 4:44 PM
>>> *To:*  < 
>>> <[email protected]>mailto:[email protected]<[email protected]>>
>>>  < 
>>> <[email protected]>mailto:[email protected]<[email protected]>>
>>> <[email protected]>[email protected]
>>>
>>> *Subject:* Self join problem
>>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> I'm trying to run the following query where m1 and m2 have the same data
>>> (>29M rows) on a 3-node hadoop cluster.  I'm essentially trying to do a self
>>> join.  It ends up running 269 map jobs and 1 reduce job.  The map jobs
>>> complete but the reduce job just runs on one process on one of the hadoop
>>> nodes at 100% cpu utilization and just slowly increases in memory
>>> consumption.  The reduce job never goes beyond 82% complete despite letting
>>> it run for a day.
>>>
>>>
>>>
>>> I am running on 0.5.0 based on this morning's trunk.
>>>
>>>
>>>
>>> insert overwrite table foo1
>>>
>>> select m1.id < <http://m1.id/>http://m1.id>  as id_1, m2.id <<http://m2.id/>
>>> http://m2.id>  as id_2, count(1), m1.dt
>>>
>>> from m1 join m2 where m1.id < <http://m1.id/>http://m1.id>  <> m2.id 
>>> <<http://m2.id/>
>>> http://m2.id>  and m1.id < <http://m1.id/>http://m1.id>  < m2.id 
>>> <<http://m2.id/>
>>> http://m2.id> and m1.dt = m2.dt group by m1.id < <http://m1.id/>
>>> http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt;
>>>
>>>
>>>
>>>
>>> Any input would be appreciated.
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>
>

Reply via email to