I have two sequence file tables with 10GB each, and it's exhibiting
the same problem of the final reducer just never finishing.
Any ideas there? Have you tried joining across such large tables?
Ryan
On Nov 11, 2009, at 1:37 PM, Ning Zhang <[email protected]> wrote:
The problem is in the data loading process: the m1 file is a plain
text CSV format and you are loading it to a Hive table with the
default setting, which assumes fields are separated by ctl_A. So if
you look at the first 10 rows, all fields are NULL since Hive cannot
find ctl+A in a row. So your query is actually doing a cartesian
product of 100k x 100k rows.
Since the 'Load data' command doesn't check the input format not
does it transform the format, you need to specify the input format
in the create table DDL. Following is a working example. It finishes
in my unit test (single machine) in less than 3 mins.
8<--------
drop table m1;
drop table foo1;
create table m1 (
mid int,
aid int,
dt string)
row format delimited fields terminated by ','
stored as textfile;
LOAD DATA LOCAL INPATH '../data/files/m1' OVERWRITE INTO TABLE m1;
select * from m1 limit 10;
create table foo1 (
aid_1 int,
aid_2 int,
mid bigint,
dt bigint
);
insert overwrite table foo1
select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
m1.aid, m2.aid, m1.dt;
8<------------
On Nov 10, 2009, at 11:19 PM, Defenestrator wrote:
Definitely the join portion of the plan. The one reduce job takes
over 2 1/2 hours to complete doing the following:
2009-11-10 18:14:30,309 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1 rows
2009-11-10 18:14:30,309 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1 rows
2009-11-10 18:14:30,309 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10 rows
2009-11-10 18:14:30,309 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10 rows
2009-11-10 18:14:30,311 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100 rows
2009-11-10 18:14:30,311 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100 rows
2009-11-10 18:14:30,328 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000 rows
2009-11-10 18:14:30,328 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000 rows
2009-11-10 18:14:30,445 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 10000 rows
2009-11-10 18:14:30,446 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000
rows
2009-11-10 18:14:30,560 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 100000 rows
2009-11-10 18:14:30,560 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 100000
rows
2009-11-10 18:14:31,431 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 1000000
rows
2009-11-10 18:14:31,431 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 1000000
rows
2009-11-10 18:14:32,384 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding 2000000
rows
...
2009-11-10 20:53:19,459 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding
9999000000 rows
2009-11-10 20:53:19,459 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 9999000000
rows
2009-11-10 20:53:20,374 INFO
org.apache.hadoop.hive.ql.exec.JoinOperator: 4 forwarding
10000000000 rows
2009-11-10 20:53:20,374 INFO
org.apache.hadoop.hive.ql.exec.SelectOperator: 5 forwarding 10000000000
rows
On Tue, Nov 10, 2009 at 9:07 PM, Namit Jain <[email protected]>
wrote:
I think you missed the attachment.
Which job is taking more time – join or group by ?
Can you send the data characteristics for m1 and foo1 – is it poss
ible that there is a large skew on aid and dt which is forcing the
data to be send to a single reducer
-namit
On 11/10/09 6:35 PM, "Defenestrator" <[email protected]>
wrote:
I would definitely appreciate any insights on this from the list.
I tried to reduce the query down to something that is easily
understood and hive still demonstrates a pretty poor join
performance behavior on a three-node hadoop cluster.
drop table m1;
drop table foo1;
create table m1 (
mid int,
aid int,
dt string);
LOAD DATA LOCAL INPATH 'm1' OVERWRITE INTO TABLE m1;
create table foo1 (
aid_1 int,
aid_2 int,
mid bigint,
dt bigint
);
set mapred.reduce.tasks=32;
insert overwrite table foo1
select m1.aid as aid_1, m2.aid as aid_2, count(1), m1.dt as dt
from m1 m1 join m1 m2 on m1.aid = m2.aid and m1.dt = m2.dt group by
m1.aid, m2.aid, m1.dt;
Attached is the file I'm using that only has 100k rows. I've
looked at the benchmark (http://issues.apache.org/jira/secure/attachment/12411185/hive_benchmark_2009-06-18.pdf
) and hive seems to be able to join much bigger data sets. And I
tried running the same query on a single node dbms on my desktop,
and it's able to return results in less than 3-minutes. While hive
has been running for at least 20 minutes now.
Thanks.
On Tue, Nov 10, 2009 at 3:53 PM, Ryan LeCompte <[email protected]>
wrote:
Any thoughts on this? I've only had luck by reducing the data on
each side of the join. Is this something Hive might be able to
improve in a future release of the query plan optimization?
Thanks,
Ryan
On Nov 3, 2009, at 10:55 PM, Ryan LeCompte <[email protected]>
wrote:
I've had a similar issue with a small cluster. Is there any way
that you can reduce the size of the data being joined on both
sides? If you search the forums for join issue, you will see the
thread for my issue and get some tips.
Thanks,
Ryan
On Nov 3, 2009, at 10:45 PM, Defenestrator < <mailto:[email protected]
> [email protected]> wrote:
I was able to increase the number of reduce jobs manually to 32.
However, it finishes 28 of them and the other 4 has the same
behavior of using 100% cpu and consuming a lot of memory. I'm
suspecting that it might be an issue with the reduce job itself -
is there a way to figure out what these jobs are doing exactly?
Thanks.
On Tue, Nov 3, 2009 at 6:53 PM, Namit Jain < <mailto:[email protected]
> <mailto:[email protected]> [email protected]> wrote:
The number of reducers are inferred from the input data size. But,
you can always overwrite it by setting mapred.reduce.tasks
From: Defenestrator [mailto: <mailto:[email protected]> <mailto:[email protected]
> [email protected]]
Sent: Tuesday, November 03, 2009 6:46 PM
To: <mailto:[email protected]> <mailto:[email protected]
> [email protected]
Subject: Re: Self join problem
Hi Namit,
Thanks for your suggestion.
I tried changing the query as you had suggested by moving the m1.dt
= m2.dt to the on clause. It increased the number of reduce jobs
to 2. So now there are two processes running on two nodes at 100%
consuming a lot of memory. Is there a reason why hive doesn't
spawn more reduce jobs for this query?
On Tue, Nov 3, 2009 at 4:47 PM, Namit Jain < <mailto:[email protected]
> <mailto:[email protected]> [email protected]> wrote:
Get the join condition in the on condition:
insert overwrite table foo1
select m1.id <http://m1.id> as id_1, m2.id <http://m2.id> as
id_2, count(1), m1.dt
from m1 join m2 on m1.dt=m2.dt where m1.id <http://m1.id> <> m2.id <http://m2.id
> and m1.id <http://m1.id> < m2.id <http://m2.id> group by m1.id
<http://m1.id> , m2.id <http://m2.id> , m1.dt;
From: Defenestrator [mailto: <mailto:[email protected]> <mailto:[email protected]
> [email protected]]
Sent: Tuesday, November 03, 2009 4:44 PM
To: <mailto:[email protected]> <mailto:[email protected]
> [email protected]
Subject: Self join problem
Hello,
I'm trying to run the following query where m1 and m2 have the same
data (>29M rows) on a 3-node hadoop cluster. I'm essentially
trying to do a self join. It ends up running 269 map jobs and 1
reduce job. The map jobs complete but the reduce job just runs on
one process on one of the hadoop nodes at 100% cpu utilization and
just slowly increases in memory consumption. The reduce job never
goes beyond 82% complete despite letting it run for a day.
I am running on 0.5.0 based on this morning's trunk.
insert overwrite table foo1
select m1.id <http://m1.id> as id_1, m2.id <http://m2.id> as
id_2, count(1), m1.dt
from m1 join m2 where m1.id <http://m1.id> <> m2.id <http://
m2.id> and m1.id <http://m1.id> < m2.id <http://m2.id> and m1.dt
= m2.dt group by m1.id <http://m1.id> , m2.id <http://m2.id> , m1.dt;
Any input would be appreciated.