2010/2/19 Gang Luo <[email protected]>: > Hi Yongqiang, > that sounds interesting. > > So, when you mention *hive-chunk*, do you mean *bucket* in database concept? > Can I further say that hive is actually doing what database does in a hash > join (divide the hash table into several buckets, load all the buckets in > turn and join them with a block from outer table)? > > If it goes like this, what is the granularity of the block from the outer > table? If the block is too small (e.g. one record), the swap between memory > and disk will be too frequent. If it is large, it eats too much memory. Do > you get a good balance here? > > Thanks. > > -Gang > > > > ----- 原始邮件 ---- > 发件人: Yongqiang He <[email protected]> > 收件人: [email protected] > 发送日期: 2010/2/19 (周五) 12:39:30 上午 > 主 题: Re: map join and OOM > > Actually Hive-917 only help when the joining tables are bucketed. > > With hive-trunk (not sure about 0.5), there will not has OOM anymore in > Hive's mapjoin, no matter how big you table is. > > On 2/18/10 3:17 PM, "Edward Capriolo" <[email protected]> wrote: > >> 2010/2/18 Gang Luo <[email protected]>: >>> some personal opinions here. >>> >>> the whole table resides in memory. It is stored in a hash table. So, the >>> heap >>> memory should be at least larger than the table size. >>> >>> Even you double your heap size. I think the job will possibly fail, for the >>> hash table in Java is not a memory-efficient data structure (Of course, this >>> really depend the number of records and the length of each record). I think >>> Map Join could only handle very small table (100 mb or so). >>> >>> -Gang >>> >>> >>> ----- 原始邮件 ---- >>> 发件人: Edward Capriolo <[email protected]> >>> 收件人: [email protected] >>> 发送日期: 2010/2/18 (周四) 5:45:10 下午 >>> 主 题: map join and OOM >>> >>> I have Hive 4.1-rc2. My query runs in Time taken: 312.956 seconds >>> using the map/reduce join. I was interested in using mapjoin, I get >>> an OOM error. >>> >>> hive> >>> java.lang.OutOfMemoryError: GC overhead limit exceeded >>> at >>> org.apache.hadoop.hive.ql.util.jdbm.recman.RecordFile.getNewNode(RecordFile.j >>> ava:369) >>> >>> My pageviews is 8GB and my client_ips is ~ 1GB >>> <property> >>> <name>mapred.child.java.opts</name> >>> <value>-Xmx778m</value> >>> </property> >>> >>> [ecapri...@nyhadoopdata10 ~]$ hive >>> Hive history >>> file=/tmp/ecapriolo/hive_job_log_ecapriolo_201002181717_253155276.txt >>> hive> explain Select /*+ MAPJOIN( client_ips )*/clientip_id,client_ip, >>> SUM(bytes_sent) as X from pageviews join client_ips on >>> pageviews.clientip_id=client_ips.id where year=2010 AND month=02 and >>> day=17 group by clientip_id,client_ip >>>> ; >>> OK >>> ABSTRACT SYNTAX TREE: >>> (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF pageviews) (TOK_TABREF >>> client_ips) (= (. (TOK_TABLE_OR_COL pageviews) clientip_id) (. >>> (TOK_TABLE_OR_COL client_ips) id)))) (TOK_INSERT (TOK_DESTINATION >>> (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT >>> TOK_MAPJOIN (TOK_HINTARGLIST client_ips))) (TOK_SELEXPR >>> (TOK_TABLE_OR_COL clientip_id)) (TOK_SELEXPR (TOK_TABLE_OR_COL >>> client_ip)) (TOK_SELEXPR (TOK_FUNCTION SUM (TOK_TABLE_OR_COL >>> bytes_sent)) X)) (TOK_WHERE (and (AND (= (TOK_TABLE_OR_COL year) 2010) >>> (= (TOK_TABLE_OR_COL month) 02)) (= (TOK_TABLE_OR_COL day) 17))) >>> (TOK_GROUPBY (TOK_TABLE_OR_COL clientip_id) (TOK_TABLE_OR_COL >>> client_ip)))) >>> >>> STAGE DEPENDENCIES: >>> Stage-1 is a root stage >>> Stage-2 depends on stages: Stage-1 >>> Stage-0 is a root stage >>> >>> STAGE PLANS: >>> Stage: Stage-1 >>> Map Reduce >>> Alias -> Map Operator Tree: >>> pageviews >>> TableScan >>> alias: pageviews >>> Filter Operator >>> predicate: >>> expr: (((UDFToDouble(year) = UDFToDouble(2010)) and >>> (UDFToDouble(month) = UDFToDouble(2))) and (UDFToDouble(day) = >>> UDFToDouble(17))) >>> type: boolean >>> Common Join Operator >>> condition map: >>> Inner Join 0 to 1 >>> condition expressions: >>> 0 {clientip_id} {bytes_sent} {year} {month} {day} >>> 1 {client_ip} >>> keys: >>> 0 >>> 1 >>> outputColumnNames: _col13, _col17, _col22, _col23, >>> _col24, _col26 >>> Position of Big Table: 0 >>> File Output Operator >>> compressed: false >>> GlobalTableId: 0 >>> table: >>> input format: >>> org.apache.hadoop.mapred.SequenceFileInputFormat >>> output format: >>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat >>> Local Work: >>> Map Reduce Local Work >>> Alias -> Map Local Tables: >>> client_ips >>> Fetch Operator >>> limit: -1 >>> Alias -> Map Local Operator Tree: >>> client_ips >>> TableScan >>> alias: client_ips >>> Common Join Operator >>> condition map: >>> Inner Join 0 to 1 >>> condition expressions: >>> 0 {clientip_id} {bytes_sent} {year} {month} {day} >>> 1 {client_ip} >>> keys: >>> 0 >>> 1 >>> outputColumnNames: _col13, _col17, _col22, _col23, >>> _col24, _col26 >>> Position of Big Table: 0 >>> File Output Operator >>> compressed: false >>> GlobalTableId: 0 >>> table: >>> input format: >>> org.apache.hadoop.mapred.SequenceFileInputFormat >>> output format: >>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat >>> >>> Stage: Stage-2 >>> Map Reduce >>> Alias -> Map Operator Tree: >>> >>> hdfs://nyhadoopname1.ops.about.com:8020/tmp/hive-ecapriolo/975920219/10002 >>> Select Operator >>> expressions: >>> expr: _col13 >>> type: int >>> expr: _col17 >>> type: int >>> expr: _col22 >>> type: string >>> expr: _col23 >>> type: string >>> expr: _col24 >>> type: string >>> expr: _col26 >>> type: string >>> outputColumnNames: _col13, _col17, _col22, _col23, _col24, _col26 >>> Filter Operator >>> predicate: >>> expr: (((UDFToDouble(_col22) = UDFToDouble(2010)) >>> and (UDFToDouble(_col23) = UDFToDouble(2))) and (UDFToDouble(_col24) = >>> UDFToDouble(17))) >>> type: boolean >>> Select Operator >>> expressions: >>> expr: _col13 >>> type: int >>> expr: _col26 >>> type: string >>> expr: _col17 >>> type: int >>> outputColumnNames: _col13, _col26, _col17 >>> Group By Operator >>> aggregations: >>> expr: sum(_col17) >>> keys: >>> expr: _col13 >>> type: int >>> expr: _col26 >>> type: string >>> mode: hash >>> outputColumnNames: _col0, _col1, _col2 >>> Reduce Output Operator >>> key expressions: >>> expr: _col0 >>> type: int >>> expr: _col1 >>> type: string >>> sort order: ++ >>> Map-reduce partition columns: >>> expr: _col0 >>> type: int >>> expr: _col1 >>> type: string >>> tag: -1 >>> value expressions: >>> expr: _col2 >>> type: bigint >>> Reduce Operator Tree: >>> Group By Operator >>> aggregations: >>> expr: sum(VALUE._col0) >>> keys: >>> expr: KEY._col0 >>> type: int >>> expr: KEY._col1 >>> type: string >>> mode: mergepartial >>> outputColumnNames: _col0, _col1, _col2 >>> Select Operator >>> expressions: >>> expr: _col0 >>> type: int >>> expr: _col1 >>> type: string >>> expr: _col2 >>> type: bigint >>> outputColumnNames: _col0, _col1, _col2 >>> File Output Operator >>> compressed: false >>> GlobalTableId: 0 >>> table: >>> input format: org.apache.hadoop.mapred.TextInputFormat >>> output format: >>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >>> >>> Stage: Stage-0 >>> Fetch Operator >>> limit: -1 >>> >>> >>> Time taken: 4.511 seconds >>> >>> Q: is the 1GB client_ip table too large for a mapjoin? >>> Memory <value>-Xmx778m</value>. I could go higher. Not sure if i want >>> to may have a cascading affect. >>> Q: is the table in mapjoin all in main memory? Or is this like a small >>> database on each mapper? >>> >>> Any other hints? Thank you. >>> >>> >>> >>> ___________________________________________________________ >>> 好玩贺卡等你发,邮箱贺卡全新上线! >>> http://card.mail.cn.yahoo.com/ >>> >> >> Understood. map/join is not possible here. Really 300s is a fine time >> for my query. >> >> HIVE-917 wont work I do not think. This is a star schema, the bigtable >> needs to be joined with multiple tables so we can not chose one bucket >> that would work for all. >> >> Has anyone ever considered doing the map-join with derby? This way >> mapjoin is not a main memory operation. >> >> > > > ___________________________________________________________ > 好玩贺卡等你发,邮箱贺卡全新上线! > http://card.mail.cn.yahoo.com/ >
@Ning >>Map-join works with persistent data structure staring from branch 0.5. We >>will keep a cache >>using HashMap in main memory and you can specify the # of >>rows to be put in the cache. If it >>is too large, data will be spilled to >>disk-based hash table. So I have a problem, I am trying to join 5 large tables. My pageview table is partitioned by day ~6GB but I have 4 other tables that can be about 1GB each. Currently joining the 5 tables throws OOM. I see only two ways I can solve this. 1) break my joins up into stages, each stage is written to a hive table @Ning. or anyone else With the enhancements to mapjoin in 5.0, will my 5 table join work (theoretically) if I use mapjoin? Speed is not really an issue here, but if joining 2+ tables is not practical in most cases I need to start planning for this now. Thanks
