Your understanding is mostly correct. But the number of reducers are estimated at compile time. It is not a strict 1:1 mapping between distinct join keys with # of reducers since the stats are not available at compile time now.
Your solution is correct in theory. but it is not trivial to partition mapper output with the same key to different reducers. It probably need to add a "grouping ID" to the sorting key before sending to the reducers. Ning On Nov 11, 2009, at 12:21 PM, Defenestrator wrote: Thanks for the explanation, Ning. This explains the behavior that I'm seeing with the original query that I was trying to run which was doing a cartesian product for each dt (of which there are only 2) and two reducers were running very slowly. insert overwrite table foo1 select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt from m1 m1 join m1 m2 on m1.dt = m2.dt where m1.aid <> m2.aid and m1.aid < m2.aid group by m1.aid, m2.aid, m1.dt; Here's my understanding of the issue, please correct me if I'm incorrect. Because there are only two distinct "dt" values, hive will only allocate a single reducer to do the cartesian product per distinct "dt" value. And this problem doesn't necessarily have anything to do with skewed data, right? Suppose I have a very large dataset that has an even distribution of x join key values that all produce a lot of join output tuples, hive will have x reducers that will all run very slowly. And the solution for this problem is really for hive to have multiple reducers computing the cartesian product for each distinct join key value, correct? On Wed, Nov 11, 2009 at 11:16 AM, Ning Zhang <[email protected]<mailto:[email protected]>> wrote: I think it depends on how many rows are there in each table and what the distribution of the join keys. I suspect that your data are very skewed so that a lot of rows in table A have the same join key with a lot of rows in table B. That will produce huge number of rows as the join result. Hive currently has at most 1 reducer for each distinct join key. So it may be very slow for this reducer. There are some JIRAs created for this problem. I don't know if there is someone actively working on this, but it should be a great research project. Another way you may be able to optimize is to rewrite your query to use semijoin if you could. Semijoin is just checked in to trunk so you probably need to check out trunk and try it out. If you can rewrite it using semjoin you can avoid cartesian product. The basic idea of semijoin is to implement the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you have a SQL query like: select m1.aid, m1.dt, count(1) from m1 m1 where exists (select null from m1 m2 where m1.aid = m2.aid and m1.dt=m2.dt) group by m1.aid, m1.dt; you can rewrite it using left semi join in HiveQL as select m1.aid, m1.dt, count(1) from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt) group by m1.aid, m1.dt; Note that you can not 'select' any columns from m2 in the right-hand-side of the table in left semi join, just as in the above exists subquery you cannot reference inner query tables from the outer query. The benefits of using semijoin rather than inner join is that if there are a lot of rows with the same join key in m1 and m2, it will return after the first match, without doing a cartesian product of all matching rows in m1 and m2. Of course whether you can rewrite this depends on your application semantics. If you really want all the combinations of the rows from m1 AND m2, semijoin won't help you. Ning On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote: I have two sequence file tables with 10GB each, and it's exhibiting the same problem of the final reducer just never finishing. Any ideas there? Have you tried joining across such large tables? Ryan On Nov 11, 2009, at 1:37 PM, Ning Zhang <[email protected]<mailto:[email protected]>> wrote: The problem is in the data loading process: the m1 file is a plain text CSV format and you are loading it to a Hive table with the default setting, which assumes fields are separated by ctl_A. So if you look at the first 10 rows, all fields are NULL since Hive cannot find ctl+A in a row. So your query is actually doing a cartesian product of 100k x 100k rows. Since the 'Load data' command doesn't check the input format not does it transform the format, you need to specify the input format in the create table DDL. Following is a working example. It finishes in my unit test (single machine) in less than 3 mins. 8<-------- drop table m1; drop table foo1; create table m1 ( mid int, aid int, dt string) row format delimited fields terminated by ',' stored as textfile; LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1; select * from m1 limit 10; create table foo1 ( aid_1 int, aid_2 int, mid bigint, dt bigint ); insert overwrite table foo1 select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt; 8<------------ On Nov 10, 2009, at 11:19 PM, Defenestrator wrote: Definitely the join portion of the plan. The one reduce job takes over 2 1/2 hours to complete doing the following: 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows 2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows 2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows 2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows 2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows 2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows 2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000 rows 2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows 2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000 rows 2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000 rows 2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000 rows 2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000 rows ... 2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 9999000000 rows 2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000 rows 2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000000000 rows 2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000 rows On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain <<mailto:[email protected]>[email protected]<mailto:[email protected]>> wrote: I think you missed the attachment. Which job is taking more time – join or group by ? Can you send the data characteristics for m1 and foo1 – is it possible that there is a large skew on aid and dt which is forcing the data to be send to a single reducer -namit On 11/10/09 6:35 PM, "Defenestrator" <<mailto:[email protected]>[email protected]<mailto:[email protected]>> wrote: I would definitely appreciate any insights on this from the list. I tried to reduce the query down to something that is easily understood and hive still demonstrates a pretty poor join performance behavior on a three-node hadoop cluster. drop table m1; drop table foo1; create table m1 ( mid int, aid int, dt string); LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1; create table foo1 ( aid_1 int, aid_2 int, mid bigint, dt bigint ); set mapred.reduce.tasks=32; insert overwrite table foo1 select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by m1.aid, m2.aid, m1.dt; Attached is the file I'm using that only has 100k rows. I've looked at the benchmark (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)>http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) and hive seems to be able to join much bigger data sets. And I tried running the same query on a single node dbms on my desktop, and it's able to return results in less than 3-minutes. While hive has been running for at least 20 minutes now. Thanks. On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte <<mailto:[email protected]>[email protected]<mailto:[email protected]>> wrote: Any thoughts on this? I've only had luck by reducing the data on each side of the join. Is this something Hive might be able to improve in a future release of the query plan optimization? Thanks, Ryan On Nov 3, 2009, at 10:55 PM, Ryan LeCompte <<mailto:[email protected]>[email protected]<mailto:[email protected]>> wrote: I've had a similar issue with a small cluster. Is there any way that you can reduce the size of the data being joined on both sides? If you search the forums for join issue, you will see the thread for my issue and get some tips. Thanks, Ryan On Nov 3, 2009, at 10:45 PM, Defenestrator < <<mailto:[email protected]>mailto:[email protected]> <mailto:[email protected]> [email protected]<mailto:[email protected]>> wrote: I was able to increase the number of reduce jobs manually to 32. However, it finishes 28 of them and the other 4 has the same behavior of using 100% cpu and consuming a lot of memory. I'm suspecting that it might be an issue with the reduce job itself - is there a way to figure out what these jobs are doing exactly? Thanks. On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < <<mailto:[email protected]>mailto:[email protected]> <<mailto:[email protected]>mailto:[email protected]> <mailto:[email protected]> [email protected]<mailto:[email protected]>> wrote: The number of reducers are inferred from the input data size. But, you can always overwrite it by setting mapred.reduce.tasks From: Defenestrator [mailto: <mailto:<mailto:[email protected]>[email protected]<mailto:[email protected]>> <mailto:<mailto:[email protected]>[email protected]<mailto:[email protected]>> <mailto:[email protected]> [email protected]<mailto:[email protected]>] Sent: Tuesday, November 03, 2009 6:46 PM To: <<mailto:[email protected]>mailto:[email protected]> <<mailto:[email protected]>mailto:[email protected]> <mailto:[email protected]> [email protected]<mailto:[email protected]> Subject: Re: Self join problem Hi Namit, Thanks for your suggestion. I tried changing the query as you had suggested by moving the m1.dt = m2.dt to the on clause. It increased the number of reduce jobs to 2. So now there are two processes running on two nodes at 100% consuming a lot of memory. Is there a reason why hive doesn't spawn more reduce jobs for this query? On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < <<mailto:[email protected]>mailto:[email protected]> <<mailto:[email protected]>mailto:[email protected]> <mailto:[email protected]> [email protected]<mailto:[email protected]>> wrote: Get the join condition in the on condition: insert overwrite table foo1 select m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> as id_1, m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> as id_2, count(1), m1.dt from m1 join m2 on m1.dt=m2.dt where m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> <> m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> < m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> group by m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> , m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> , m1.dt; From: Defenestrator [mailto: <mailto:<mailto:[email protected]>[email protected]<mailto:[email protected]>> <mailto:<mailto:[email protected]>[email protected]<mailto:[email protected]>> <mailto:[email protected]> [email protected]<mailto:[email protected]>] Sent: Tuesday, November 03, 2009 4:44 PM To: <<mailto:[email protected]>mailto:[email protected]> <<mailto:[email protected]>mailto:[email protected]> <mailto:[email protected]> [email protected]<mailto:[email protected]> Subject: Self join problem Hello, I'm trying to run the following query where m1 and m2 have the same data (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self join. It ends up running 269 map jobs and 1 reduce job. The map jobs complete but the reduce job just runs on one process on one of the hadoop nodes at 100% cpu utilization and just slowly increases in memory consumption. The reduce job never goes beyond 82% complete despite letting it run for a day. I am running on 0.5.0 based on this morning's trunk. insert overwrite table foo1 select m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> as id_1, m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> as id_2, count(1), m1.dt from m1 join m2 where m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> <> m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> < m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> and m1.dt = m2.dt group by m1.id<http://m1.id/> <<http://m1.id/>http://m1.id<http://m1.id/>> , m2.id<http://m2.id/> <<http://m2.id/>http://m2.id<http://m2.id/>> , m1.dt; Any input would be appreciated.
