Ning, Can you explain how it comes up with the estimate at compile time, if the # of distinct join keys is known only at runtime? Is this just based on the number of hdfs blocks of the relations being joined?
Thanks. On Wed, Nov 11, 2009 at 9:21 PM, Ning Zhang <[email protected]> wrote: > Your understanding is mostly correct. But the number of reducers are > estimated at compile time. It is not a strict 1:1 mapping between distinct > join keys with # of reducers since the stats are not available at compile > time now. > > Your solution is correct in theory. but it is not trivial to partition > mapper output with the same key to different reducers. It probably need to > add a "grouping ID" to the sorting key before sending to the reducers. > > Ning > > > On Nov 11, 2009, at 12:21 PM, Defenestrator wrote: > > Thanks for the explanation, Ning. This explains the behavior that I'm > seeing with the original query that I was trying to run which was doing a > cartesian product for each dt (of which there are only 2) and two reducers > were running very slowly. > > insert overwrite table foo1 > > select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt > > from m1 m1 join m1 m2 on m1.dt = m2.dt where m1.aid <> m2.aid and m1.aid < >> m2.aid group by m1.aid, m2.aid, m1.dt; > > > > Here's my understanding of the issue, please correct me if I'm incorrect. > Because there are only two distinct "dt" values, hive will only allocate a > single reducer to do the cartesian product per distinct "dt" value. > > And this problem doesn't necessarily have anything to do with skewed data, > right? Suppose I have a very large dataset that has an even distribution of > x join key values that all produce a lot of join output tuples, hive will > have x reducers that will all run very slowly. And the solution for this > problem is really for hive to have multiple reducers computing the cartesian > product for each distinct join key value, correct? > > On Wed, Nov 11, 2009 at 11:16 AM, Ning Zhang <[email protected]> wrote: > >> I think it depends on how many rows are there in each table and what the >> distribution of the join keys. I suspect that your data are very skewed so >> that a lot of rows in table A have the same join key with a lot of rows in >> table B. That will produce huge number of rows as the join result. Hive >> currently has at most 1 reducer for each distinct join key. So it may be >> very slow for this reducer. There are some JIRAs created for this problem. I >> don't know if there is someone actively working on this, but it should be a >> great research project. >> >> Another way you may be able to optimize is to rewrite your query to use >> semijoin if you could. Semijoin is just checked in to trunk so you probably >> need to check out trunk and try it out. If you can rewrite it using semjoin >> you can avoid cartesian product. The basic idea of semijoin is to implement >> the semantics of IN/EXISTS subqueries which Hive doesn't support yet. If you >> have a SQL query like: >> >> select m1.aid, m1.dt, count(1) >> from m1 m1 >> where exists (select null from m1 m2 where m1.aid = m2.aid and >> m1.dt=m2.dt) >> group by m1.aid, m1.dt; >> >> you can rewrite it using left semi join in HiveQL as >> >> select m1.aid, m1.dt, count(1) >> from m1 m1 left semi join m1 m2 on (m1.aid = m2.aid and m1.dt=m2.dt) >> group by m1.aid, m1.dt; >> >> Note that you can not 'select' any columns from m2 in the right-hand-side >> of the table in left semi join, just as in the above exists subquery you >> cannot reference inner query tables from the outer query. >> >> The benefits of using semijoin rather than inner join is that if there are >> a lot of rows with the same join key in m1 and m2, it will return after the >> first match, without doing a cartesian product of all matching rows in m1 >> and m2. >> >> Of course whether you can rewrite this depends on your application >> semantics. If you really want all the combinations of the rows from m1 AND >> m2, semijoin won't help you. >> >> Ning >> >> On Nov 11, 2009, at 10:45 AM, Ryan LeCompte wrote: >> >> I have two sequence file tables with 10GB each, and it's exhibiting the >> same problem of the final reducer just never finishing. >> >> Any ideas there? Have you tried joining across such large tables? >> >> Ryan >> >> >> >> >> On Nov 11, 2009, at 1:37 PM, Ning Zhang <[email protected]> wrote: >> >> The problem is in the data loading process: the m1 file is a plain text >> CSV format and you are loading it to a Hive table with the default setting, >> which assumes fields are separated by ctl_A. So if you look at the first 10 >> rows, all fields are NULL since Hive cannot find ctl+A in a row. So your >> query is actually doing a cartesian product of 100k x 100k rows. >> >> Since the 'Load data' command doesn't check the input format not does it >> transform the format, you need to specify the input format in the create >> table DDL. Following is a working example. It finishes in my unit test >> (single machine) in less than 3 mins. >> >> 8<-------- >> drop table m1; >> drop table foo1; >> >> create table m1 ( >> mid int, >> aid int, >> dt string) >> row format delimited fields terminated by ',' >> stored as textfile; >> >> LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1; >> >> select * from m1 limit 10; >> >> create table foo1 ( >> aid_1 int, >> aid_2 int, >> mid bigint, >> dt bigint >> ); >> >> >> insert overwrite table foo1 >> select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt >> from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by >> m1.aid, m2.aid, m1.dt; >> 8<------------ >> >> On Nov 10, 2009, at 11:19 PM, Defenestrator wrote: >> >> Definitely the join portion of the plan. The one reduce job takes over 2 >> 1/2 hours to complete doing the following: >> >> >> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 1 rows >> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 1 rows >> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 10 rows >> 2009-11-10 18:14:30,309 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 10 rows >> 2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 100 rows >> 2009-11-10 18:14:30,311 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 100 rows >> 2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 1000 rows >> 2009-11-10 18:14:30,328 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 1000 rows >> 2009-11-10 18:14:30,445 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 10000 rows >> 2009-11-10 18:14:30,446 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 10000 rows >> 2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 100000 rows >> 2009-11-10 18:14:30,560 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 100000 rows >> 2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 1000000 rows >> 2009-11-10 18:14:31,431 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 1000000 rows >> 2009-11-10 18:14:32,384 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 2000000 rows >> >> ... >> >> 2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 9999000000 rows >> 2009-11-10 20:53:19,459 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 9999000000 rows >> 2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.JoinOperator: 4 >> forwarding 10000000000 rows >> 2009-11-10 20:53:20,374 INFO org.apache.hadoop.hive.ql.exec.SelectOperator: >> 5 forwarding 10000000000 rows >> >> >> >> On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain < <[email protected]> >> [email protected]> wrote: >> >>> I think you missed the attachment. >>> >>> >>> Which job is taking more time – join or group by ? >>> >>> Can you send the data characteristics for m1 and foo1 – is it possible >>> that there is a large skew on aid and dt which is forcing the data to be >>> send to a single reducer >>> >>> >>> >>> -namit >>> >>> >>> >>> On 11/10/09 6:35 PM, "Defenestrator" < <[email protected]> >>> [email protected]> wrote: >>> >>> I would definitely appreciate any insights on this from the list. I >>> tried to reduce the query down to something that is easily understood and >>> hive still demonstrates a pretty poor join performance behavior on a >>> three-node hadoop cluster. >>> >>> drop table m1; >>> drop table foo1; >>> >>> create table m1 ( >>> mid int, >>> aid int, >>> dt string); >>> >>> LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1; >>> >>> create table foo1 ( >>> aid_1 int, >>> aid_2 int, >>> mid bigint, >>> dt bigint >>> ); >>> >>> set mapred.reduce.tasks=32; >>> >>> insert overwrite table foo1 >>> select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt >>> from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by >>> m1.aid, m2.aid, m1.dt; >>> >>> >>> Attached is the file I'm using that only has 100k rows. I've looked at >>> the benchmark >>> (<http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf)> >>> http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf) >>> and hive seems to be able to join much bigger data sets. And I tried >>> running the same query on a single node dbms on my desktop, and it's able to >>> return results in less than 3-minutes. While hive has been running for at >>> least 20 minutes now. >>> >>> Thanks. >>> >>> On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte < <[email protected]> >>> [email protected]> wrote: >>> >>> Any thoughts on this? I've only had luck by reducing the data on each >>> side of the join. Is this something Hive might be able to improve in a >>> future release of the query plan optimization? >>> >>> Thanks, >>> Ryan >>> >>> >>> >>> On Nov 3, 2009, at 10:55 PM, Ryan LeCompte < <[email protected]> >>> [email protected]> wrote: >>> >>> I've had a similar issue with a small cluster. Is there any way that >>> you can reduce the size of the data being joined on both sides? If you >>> search the forums for join issue, you will see the thread for my issue and >>> get some tips. >>> >>> Thanks, >>> Ryan >>> >>> >>> >>> On Nov 3, 2009, at 10:45 PM, Defenestrator < <<[email protected]> >>> mailto:[email protected] <[email protected]>> >>> <[email protected]>[email protected]> wrote: >>> >>> I was able to increase the number of reduce jobs manually to 32. >>> However, it finishes 28 of them and the other 4 has the same behavior of >>> using 100% cpu and consuming a lot of memory. I'm suspecting that it might >>> be an issue with the reduce job itself - is there a way to figure out what >>> these jobs are doing exactly? >>> >>> Thanks. >>> >>> On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < < <[email protected]> >>> mailto:[email protected] <[email protected]>> < <[email protected]> >>> mailto:[email protected] <[email protected]>> <[email protected]> >>> [email protected]> wrote: >>> >>> The number of reducers are inferred from the input data size. But, you >>> can always overwrite it by setting mapred.reduce.tasks >>> >>> >>> >>> >>> *From:* Defenestrator [mailto: <mailto: <[email protected]> >>> [email protected]> <mailto: <[email protected]> >>> [email protected]> <[email protected]> >>> [email protected]] >>> *Sent:* Tuesday, November 03, 2009 6:46 PM >>> >>> *To:* < >>> <[email protected]>mailto:[email protected]<[email protected]>> >>> < >>> <[email protected]>mailto:[email protected]<[email protected]>> >>> <[email protected]>[email protected] >>> >>> *Subject:* Re: Self join problem >>> >>> >>> Hi Namit, >>> >>> >>> >>> Thanks for your suggestion. >>> >>> >>> >>> I tried changing the query as you had suggested by moving the m1.dt = >>> m2.dt to the on clause. It increased the number of reduce jobs to 2. So >>> now there are two processes running on two nodes at 100% consuming a lot of >>> memory. Is there a reason why hive doesn't spawn more reduce jobs for this >>> query? >>> >>> >>> >>> On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < < <[email protected]> >>> mailto:[email protected] <[email protected]>> < <[email protected]> >>> mailto:[email protected] <[email protected]>> <[email protected]> >>> [email protected]> wrote: >>> >>> Get the join condition in the on condition: >>> >>> >>> insert overwrite table foo1 >>> select m1.id < <http://m1.id/>http://m1.id> as id_1, m2.id <<http://m2.id/> >>> http://m2.id> as id_2, count(1), m1.dt >>> from m1 join m2 on m1.dt=m2.dt where m1.id < <http://m1.id/>http://m1.id> >>> <> m2.id < <http://m2.id/>http://m2.id> and m1.id < <http://m1.id/> >>> http://m1.id> < m2.id < <http://m2.id/>http://m2.id> group by m1.id >>> <<http://m1.id/> >>> http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt; >>> >>> >>> >>> >>> *From:* Defenestrator [mailto: <mailto: <[email protected]> >>> [email protected]> <mailto: <[email protected]> >>> [email protected]> <[email protected]> >>> [email protected]] >>> *Sent:* Tuesday, November 03, 2009 4:44 PM >>> *To:* < >>> <[email protected]>mailto:[email protected]<[email protected]>> >>> < >>> <[email protected]>mailto:[email protected]<[email protected]>> >>> <[email protected]>[email protected] >>> >>> *Subject:* Self join problem >>> >>> >>> >>> Hello, >>> >>> >>> >>> I'm trying to run the following query where m1 and m2 have the same data >>> (>29M rows) on a 3-node hadoop cluster. I'm essentially trying to do a self >>> join. It ends up running 269 map jobs and 1 reduce job. The map jobs >>> complete but the reduce job just runs on one process on one of the hadoop >>> nodes at 100% cpu utilization and just slowly increases in memory >>> consumption. The reduce job never goes beyond 82% complete despite letting >>> it run for a day. >>> >>> >>> >>> I am running on 0.5.0 based on this morning's trunk. >>> >>> >>> >>> insert overwrite table foo1 >>> >>> select m1.id < <http://m1.id/>http://m1.id> as id_1, m2.id <<http://m2.id/> >>> http://m2.id> as id_2, count(1), m1.dt >>> >>> from m1 join m2 where m1.id < <http://m1.id/>http://m1.id> <> m2.id >>> <<http://m2.id/> >>> http://m2.id> and m1.id < <http://m1.id/>http://m1.id> < m2.id >>> <<http://m2.id/> >>> http://m2.id> and m1.dt = m2.dt group by m1.id < <http://m1.id/> >>> http://m1.id> , m2.id < <http://m2.id/>http://m2.id> , m1.dt; >>> >>> >>> >>> >>> Any input would be appreciated. >>> >>> >>> >>> >>> >> >> >> > >
