Hi Yongqiang, that sounds interesting. So, when you mention *hive-chunk*, do you mean *bucket* in database concept? Can I further say that hive is actually doing what database does in a hash join (divide the hash table into several buckets, load all the buckets in turn and join them with a block from outer table)?
If it goes like this, what is the granularity of the block from the outer table? If the block is too small (e.g. one record), the swap between memory and disk will be too frequent. If it is large, it eats too much memory. Do you get a good balance here? Thanks. -Gang ----- 原始邮件 ---- 发件人: Yongqiang He <[email protected]> 收件人: [email protected] 发送日期: 2010/2/19 (周五) 12:39:30 上午 主 题: Re: map join and OOM Actually Hive-917 only help when the joining tables are bucketed. With hive-trunk (not sure about 0.5), there will not has OOM anymore in Hive's mapjoin, no matter how big you table is. On 2/18/10 3:17 PM, "Edward Capriolo" <[email protected]> wrote: > 2010/2/18 Gang Luo <[email protected]>: >> some personal opinions here. >> >> the whole table resides in memory. It is stored in a hash table. So, the heap >> memory should be at least larger than the table size. >> >> Even you double your heap size. I think the job will possibly fail, for the >> hash table in Java is not a memory-efficient data structure (Of course, this >> really depend the number of records and the length of each record). I think >> Map Join could only handle very small table (100 mb or so). >> >> -Gang >> >> >> ----- 原始邮件 ---- >> 发件人: Edward Capriolo <[email protected]> >> 收件人: [email protected] >> 发送日期: 2010/2/18 (周四) 5:45:10 下午 >> 主 题: map join and OOM >> >> I have Hive 4.1-rc2. My query runs in Time taken: 312.956 seconds >> using the map/reduce join. I was interested in using mapjoin, I get >> an OOM error. >> >> hive> >> java.lang.OutOfMemoryError: GC overhead limit exceeded >> at >> org.apache.hadoop.hive.ql.util.jdbm.recman.RecordFile.getNewNode(RecordFile.j >> ava:369) >> >> My pageviews is 8GB and my client_ips is ~ 1GB >> <property> >> <name>mapred.child.java.opts</name> >> <value>-Xmx778m</value> >> </property> >> >> [ecapri...@nyhadoopdata10 ~]$ hive >> Hive history >> file=/tmp/ecapriolo/hive_job_log_ecapriolo_201002181717_253155276.txt >> hive> explain Select /*+ MAPJOIN( client_ips )*/clientip_id,client_ip, >> SUM(bytes_sent) as X from pageviews join client_ips on >> pageviews.clientip_id=client_ips.id where year=2010 AND month=02 and >> day=17 group by clientip_id,client_ip >>> ; >> OK >> ABSTRACT SYNTAX TREE: >> (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF pageviews) (TOK_TABREF >> client_ips) (= (. (TOK_TABLE_OR_COL pageviews) clientip_id) (. >> (TOK_TABLE_OR_COL client_ips) id)))) (TOK_INSERT (TOK_DESTINATION >> (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT >> TOK_MAPJOIN (TOK_HINTARGLIST client_ips))) (TOK_SELEXPR >> (TOK_TABLE_OR_COL clientip_id)) (TOK_SELEXPR (TOK_TABLE_OR_COL >> client_ip)) (TOK_SELEXPR (TOK_FUNCTION SUM (TOK_TABLE_OR_COL >> bytes_sent)) X)) (TOK_WHERE (and (AND (= (TOK_TABLE_OR_COL year) 2010) >> (= (TOK_TABLE_OR_COL month) 02)) (= (TOK_TABLE_OR_COL day) 17))) >> (TOK_GROUPBY (TOK_TABLE_OR_COL clientip_id) (TOK_TABLE_OR_COL >> client_ip)))) >> >> STAGE DEPENDENCIES: >> Stage-1 is a root stage >> Stage-2 depends on stages: Stage-1 >> Stage-0 is a root stage >> >> STAGE PLANS: >> Stage: Stage-1 >> Map Reduce >> Alias -> Map Operator Tree: >> pageviews >> TableScan >> alias: pageviews >> Filter Operator >> predicate: >> expr: (((UDFToDouble(year) = UDFToDouble(2010)) and >> (UDFToDouble(month) = UDFToDouble(2))) and (UDFToDouble(day) = >> UDFToDouble(17))) >> type: boolean >> Common Join Operator >> condition map: >> Inner Join 0 to 1 >> condition expressions: >> 0 {clientip_id} {bytes_sent} {year} {month} {day} >> 1 {client_ip} >> keys: >> 0 >> 1 >> outputColumnNames: _col13, _col17, _col22, _col23, >> _col24, _col26 >> Position of Big Table: 0 >> File Output Operator >> compressed: false >> GlobalTableId: 0 >> table: >> input format: >> org.apache.hadoop.mapred.SequenceFileInputFormat >> output format: >> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat >> Local Work: >> Map Reduce Local Work >> Alias -> Map Local Tables: >> client_ips >> Fetch Operator >> limit: -1 >> Alias -> Map Local Operator Tree: >> client_ips >> TableScan >> alias: client_ips >> Common Join Operator >> condition map: >> Inner Join 0 to 1 >> condition expressions: >> 0 {clientip_id} {bytes_sent} {year} {month} {day} >> 1 {client_ip} >> keys: >> 0 >> 1 >> outputColumnNames: _col13, _col17, _col22, _col23, >> _col24, _col26 >> Position of Big Table: 0 >> File Output Operator >> compressed: false >> GlobalTableId: 0 >> table: >> input format: >> org.apache.hadoop.mapred.SequenceFileInputFormat >> output format: >> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat >> >> Stage: Stage-2 >> Map Reduce >> Alias -> Map Operator Tree: >> >> hdfs://nyhadoopname1.ops.about.com:8020/tmp/hive-ecapriolo/975920219/10002 >> Select Operator >> expressions: >> expr: _col13 >> type: int >> expr: _col17 >> type: int >> expr: _col22 >> type: string >> expr: _col23 >> type: string >> expr: _col24 >> type: string >> expr: _col26 >> type: string >> outputColumnNames: _col13, _col17, _col22, _col23, _col24, _col26 >> Filter Operator >> predicate: >> expr: (((UDFToDouble(_col22) = UDFToDouble(2010)) >> and (UDFToDouble(_col23) = UDFToDouble(2))) and (UDFToDouble(_col24) = >> UDFToDouble(17))) >> type: boolean >> Select Operator >> expressions: >> expr: _col13 >> type: int >> expr: _col26 >> type: string >> expr: _col17 >> type: int >> outputColumnNames: _col13, _col26, _col17 >> Group By Operator >> aggregations: >> expr: sum(_col17) >> keys: >> expr: _col13 >> type: int >> expr: _col26 >> type: string >> mode: hash >> outputColumnNames: _col0, _col1, _col2 >> Reduce Output Operator >> key expressions: >> expr: _col0 >> type: int >> expr: _col1 >> type: string >> sort order: ++ >> Map-reduce partition columns: >> expr: _col0 >> type: int >> expr: _col1 >> type: string >> tag: -1 >> value expressions: >> expr: _col2 >> type: bigint >> Reduce Operator Tree: >> Group By Operator >> aggregations: >> expr: sum(VALUE._col0) >> keys: >> expr: KEY._col0 >> type: int >> expr: KEY._col1 >> type: string >> mode: mergepartial >> outputColumnNames: _col0, _col1, _col2 >> Select Operator >> expressions: >> expr: _col0 >> type: int >> expr: _col1 >> type: string >> expr: _col2 >> type: bigint >> outputColumnNames: _col0, _col1, _col2 >> File Output Operator >> compressed: false >> GlobalTableId: 0 >> table: >> input format: org.apache.hadoop.mapred.TextInputFormat >> output format: >> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat >> >> Stage: Stage-0 >> Fetch Operator >> limit: -1 >> >> >> Time taken: 4.511 seconds >> >> Q: is the 1GB client_ip table too large for a mapjoin? >> Memory <value>-Xmx778m</value>. I could go higher. Not sure if i want >> to may have a cascading affect. >> Q: is the table in mapjoin all in main memory? Or is this like a small >> database on each mapper? >> >> Any other hints? Thank you. >> >> >> >> ___________________________________________________________ >> 好玩贺卡等你发,邮箱贺卡全新上线! >> http://card.mail.cn.yahoo.com/ >> > > Understood. map/join is not possible here. Really 300s is a fine time > for my query. > > HIVE-917 wont work I do not think. This is a star schema, the bigtable > needs to be joined with multiple tables so we can not chose one bucket > that would work for all. > > Has anyone ever considered doing the map-join with derby? This way > mapjoin is not a main memory operation. > > ___________________________________________________________ 好玩贺卡等你发,邮箱贺卡全新上线! http://card.mail.cn.yahoo.com/
