Ning,

Prasad Chakka on the ##hive IRC channel was able to help me achieve what I
wanted with this small cluster... he suggested that if the two tables have a
lot of common values (which they do), that I should first reduce each side
of the join by doing a distinct on the column in the sub-queries and then do
a join on these two reduced sub-queries... Here's what it looks like:

SELECT COUNT(DISTINCT q1.col) FROM
   (SELECT DISTINCT col FROM table1) q1
   JOIN (SELECT DISTINCT col FROM table2) q2
   ON (q1.col = q2.col);

Hive produces 4 map/reduce jobs from the above queries and the whole thing
finishes in 15 minutes, which is very acceptable given my small cluster.

Thanks!!!!

Ryan


On Mon, Oct 26, 2009 at 2:25 PM, Ning Zhang <[email protected]> wrote:

> Changing it won't help in your case. This number is used for calculating
> number of reducers by Hive. It is a rule-of-thumb of how large the input
> should be for each reducer. If your workload exceeds this number, you
> probably want to consider scaling out your cluster.
>
> Ning
>
> On Oct 26, 2009, at 11:14 AM, Ryan LeCompte wrote:
>
> Ah, I see. And it's probably not worth trying to change the
> hive.exec.reducers.bytes.per.reducer to something smaller, right?
>
>
> On Mon, Oct 26, 2009 at 2:12 PM, Ning Zhang <[email protected]> wrote:
>
>> Yes, 4 nodes probably is too small for this. The default value of bytes
>> per reducer is 1GB (see hive.exec.reducers.bytes.per.reducer in
>> hive-default.xml).
>>
>>
>> Ning
>>
>> On Oct 26, 2009, at 10:56 AM, Ryan LeCompte wrote:
>>
>> Ning,
>>
>> Thanks for the explanation. I was able to take your example semijoin query
>> and adapt it to my use case. Hive builds 3 map/reduce jobs from the query.
>> The first one completes, but the second one still times out on the reducer.
>> Perhaps my cluster is too small for this? It's only 4 nodes.
>>
>>
>> On Mon, Oct 26, 2009 at 1:47 PM, Ning Zhang <[email protected]> wrote:
>>
>>> This query is using the regular join to simulate semijoin. The proposed
>>> semijoin syntax is something like:
>>>
>>> select A.id from A left semi join B on A.id = B.id;
>>>
>>> Semantically semijoin is the same as the simulated query using JOIN. but
>>> proposed semijoin operation is more efficient in that:
>>> 1) the simulation has a group by which requires a separate map-reduce
>>> task. In the semijoin, the group by is map-side only so that we can
>>> eliminate one map-reduce task.
>>> 2) the semijoin implements the early-exit semantics: whenever a match is
>>> found on B, the rest of the tuples in B will be omitted. While the JOIN will
>>> keep matching even though a match is found.
>>>
>>>  Thanks,
>>> Ning
>>>
>>> The difference between the semijoin implementation and the simulation
>>> using regular join is that the group by in the
>>>
>>>
>>>
>>> On Oct 26, 2009, at 9:56 AM, Ryan LeCompte wrote:
>>>
>>> I got the query working (my bad syntax caused it to error out)...
>>> However, can you please explain what Hive is doing in your example query for
>>> the semijoin?
>>>
>>> Thanks,
>>> Ryan
>>>
>>> On Mon, Oct 26, 2009 at 12:52 PM, Ryan LeCompte <[email protected]>wrote:
>>>
>>>> Also, if I try your query I get:
>>>>
>>>> FAILED: Parse Error: line 1:8 cannot recognize input 'DISTINCT' in
>>>> expression specification
>>>>
>>>>
>>>> Seems like it doesn't like SELECT (DISTINCT ...)
>>>>
>>>>
>>>>
>>>> On Mon, Oct 26, 2009 at 12:47 PM, Ryan LeCompte <[email protected]>wrote:
>>>>
>>>>> Ah, got it!
>>>>>
>>>>> Can you explain your semijoin query though? I see that you are using
>>>>> JOIN but I don't see the ON (...) part. What does Hive do in this case 
>>>>> when
>>>>> ON (..) doesn't exist?
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Oct 26, 2009 at 12:41 PM, Ning Zhang <[email protected]>wrote:
>>>>>
>>>>>> I see. If you just want all userid in one table that also appear in
>>>>>> another table, semijoin would help (it is still under implementation
>>>>>> though). You can simulate semijoin using JOIN as follows:
>>>>>>
>>>>>> select (distinct ut.userid) from usertracking ut join (select
>>>>>> usertrackingid from streamtransfers group by usertrackingid where
>>>>>> usertrackingid is not null and usertrackingid<>0) where userid is not 
>>>>>> null
>>>>>> and userid <> 0;
>>>>>>
>>>>>> BTW, the rewritten query using UNION ALL is not semantically
>>>>>> equivalent to the original query: it doesn't return "common" user IDs in
>>>>>> both table, but just the "union" of them.
>>>>>>
>>>>>> Ning
>>>>>>
>>>>>> On Oct 26, 2009, at 9:33 AM, Ryan LeCompte wrote:
>>>>>>
>>>>>> Hi Ning,
>>>>>>
>>>>>> Basically, I have roughly 60GB of log data that is logically broken up
>>>>>> into two Hive tables, which works out to be those two tables that I had
>>>>>> mentioned. Both of these tables share a common key, but this key appears 
>>>>>> in
>>>>>> virtually every row of each of the two tables. Each of these tables just 
>>>>>> has
>>>>>> 1 partition (by date). My query is very similar (although with different
>>>>>> data/columns) to Chris Bates' query:
>>>>>>
>>>>>>
>>>>>> SELECT COUNT(DISTINCT UT.UserID) FROM usertracking UT JOIN
>>>>>> streamtransfers ST ON (ST.usertrackingid = UT.usertrackingid) WHERE
>>>>>> UT.UserID IS NOT NULL AND UT.UserID <> 0;
>>>>>>
>>>>>> However, in the above example imagine that in both tables the same
>>>>>> users appear a lot, so there are lots of matches.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Oct 26, 2009 at 12:27 PM, Ning Zhang <[email protected]>wrote:
>>>>>>
>>>>>>> If it is really a Cartesian product, there is no better way other
>>>>>>> than increasing the timeout for the reducers. You can do a
>>>>>>> back-of-the-envelope calculation on how long it takes (e.g., in your 
>>>>>>> log it
>>>>>>> shows it takes 19 sec. to get 6 million rows out of the join). This
>>>>>>> calculation can also be done if it is not a Cartesian product, and you 
>>>>>>> have
>>>>>>> an good estimate of how many rows will be produced.
>>>>>>>
>>>>>>> In general, joining two huge tables can be avoided by partitioning
>>>>>>> the fact tables, so that you don't need to join the whole table.
>>>>>>>
>>>>>>> BTW, are you joining two fact tables or one dimension table is just
>>>>>>> huge?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ning
>>>>>>>
>>>>>>> On Oct 26, 2009, at 5:16 AM, Ryan LeCompte wrote:
>>>>>>>
>>>>>>> So it turns out that the JOIN key of my query basically results in a
>>>>>>> match/join on all rows of each table! There really is no extra filtering
>>>>>>> that I can do to exclude invalid rows, etc. The mappers fly by and 
>>>>>>> complete,
>>>>>>> but the reducers are just moving extremely slowly (my guess due to what
>>>>>>> Zheng said about the Cartesian product of all rows getting matched).
>>>>>>>
>>>>>>> Is there some other way that I could re-write the JOIN or is my only
>>>>>>> option to increase the timeout on the task trackers so that they don't
>>>>>>> timeout/kill the reducers? I've already upped their timeouts to 30 
>>>>>>> minutes
>>>>>>> (as opposed to the default of 10), and it doesn't seem to be 
>>>>>>> sufficient...
>>>>>>> Again, this is joining a 33GB table with a 13GB table where join key is
>>>>>>> shared by virtually all rows in both tables.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ryan
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Oct 26, 2009 at 7:35 AM, Ryan LeCompte 
>>>>>>> <[email protected]>wrote:
>>>>>>>
>>>>>>>> Thanks guys, very useful information. I will modify my query a bit
>>>>>>>> and get back to you guys on whether it worked or not.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Ryan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Oct 26, 2009 at 4:34 AM, Chris Bates <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Ryan,
>>>>>>>>>
>>>>>>>>> I asked this question a couple days ago but in a slightly different
>>>>>>>>> form.  What you have to do is make sure the table you're joining is 
>>>>>>>>> smaller
>>>>>>>>> than the leftmost table.  As an example,
>>>>>>>>>
>>>>>>>>> SELECT COUNT(DISTINCT UT.UserID) FROM usertracking UT JOIN
>>>>>>>>> streamtransfers ST ON (ST.usertrackingid = UT.usertrackingid) WHERE
>>>>>>>>> UT.UserID IS NOT NULL AND UT.UserID <> 0;
>>>>>>>>>
>>>>>>>>> In this query, usertracking is a table that is about 8 or 9 gigs.
>>>>>>>>>  Streamtransfers is a table that is about 4 gigs.  As per Zheng's 
>>>>>>>>> comment, I
>>>>>>>>> omitted UserID's of Null or Zero as there are many rows with this key 
>>>>>>>>> and
>>>>>>>>> the join worked as intended.
>>>>>>>>>
>>>>>>>>> Chris
>>>>>>>>>
>>>>>>>>> PS. As an aside, Hive is proving to be quite useful to all of our
>>>>>>>>> database hackers here at Grooveshark.  Thanks to everyone who has
>>>>>>>>> committed...I hope to contribute soon.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Oct 26, 2009 at 2:08 AM, Zheng Shao <[email protected]>wrote:
>>>>>>>>>
>>>>>>>>>> It's probably caused by the Cartesian product of many rows from
>>>>>>>>>> the two tables with the same key.
>>>>>>>>>>
>>>>>>>>>> Zheng
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Oct 25, 2009 at 7:22 PM, Ryan LeCompte <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> It also looks like the reducers just never stop outputting things
>>>>>>>>>>> likethe (following  -- see below), causing them to ultimately time 
>>>>>>>>>>> out and
>>>>>>>>>>> get killed by the system.
>>>>>>>>>>>
>>>>>>>>>>> 2009-10-25 22:21:18,879 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 100000000 rows
>>>>>>>>>>>
>>>>>>>>>>> 2009-10-25 22:21:22,009 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 101000000 
>>>>>>>>>>> rows
>>>>>>>>>>> 2009-10-25 22:21:22,010 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 101000000 rows
>>>>>>>>>>> 2009-10-25 22:21:25,141 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 102000000 
>>>>>>>>>>> rows
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2009-10-25 22:21:25,142 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 102000000 rows
>>>>>>>>>>> 2009-10-25 22:21:28,263 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 103000000 
>>>>>>>>>>> rows
>>>>>>>>>>> 2009-10-25 22:21:28,263 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 103000000 rows
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2009-10-25 22:21:31,387 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 104000000 
>>>>>>>>>>> rows
>>>>>>>>>>> 2009-10-25 22:21:31,387 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 104000000 rows
>>>>>>>>>>> 2009-10-25 22:21:34,510 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 105000000 
>>>>>>>>>>> rows
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2009-10-25 22:21:34,510 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 105000000 rows
>>>>>>>>>>> 2009-10-25 22:21:37,633 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 106000000 
>>>>>>>>>>> rows
>>>>>>>>>>> 2009-10-25 22:21:37,633 INFO 
>>>>>>>>>>> org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 
>>>>>>>>>>> 106000000 rows
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Oct 25, 2009 at 9:39 PM, Ryan LeCompte <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>
>>>>>>>>>>>> Should I expect to be able to do a Hive JOIN between two tables
>>>>>>>>>>>> that have about 10 or 15GB of data each? What I'm noticing (for a 
>>>>>>>>>>>> simple
>>>>>>>>>>>> JOIN) is that all the map tasks complete, but the reducers just 
>>>>>>>>>>>> hang at
>>>>>>>>>>>> around 87% or so (for the first set of 4 reducers), and then they 
>>>>>>>>>>>> eventually
>>>>>>>>>>>> just get killed due to inability to respond by the cluster. I can 
>>>>>>>>>>>> do a JOIN
>>>>>>>>>>>> between a large table and a very small table of 10 or so records 
>>>>>>>>>>>> just fine.
>>>>>>>>>>>>
>>>>>>>>>>>> Any thoughts?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Ryan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Yours,
>>>>>>>>>> Zheng
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to