Map-join works with persistent data structure staring from branch 0.5. We will 
keep a cache using HashMap in main memory and you can specify the # of rows to 
be put in the cache. If it is too large, data will be spilled to disk-based 
hash table. 

Thanks,
Ning

On Feb 18, 2010, at 3:17 PM, Edward Capriolo wrote:

> 2010/2/18 Gang Luo <[email protected]>:
>> some personal opinions here.
>> 
>> the whole table resides in memory. It is stored in a hash table. So, the 
>> heap memory should be at least larger than the table size.
>> 
>> Even you double your heap size. I think the job will possibly fail, for the 
>> hash table in Java is not a memory-efficient data structure (Of course, this 
>> really depend the number of records and the length of each record). I think 
>> Map Join could only handle very small table (100 mb or so).
>> 
>> -Gang
>> 
>> 
>> ----- 原始邮件 ----
>> 发件人: Edward Capriolo <[email protected]>
>> 收件人: [email protected]
>> 发送日期: 2010/2/18 (周四) 5:45:10 下午
>> 主   题: map join and OOM
>> 
>> I have Hive 4.1-rc2. My query runs in Time taken: 312.956 seconds
>> using the map/reduce join. I was interested in using mapjoin, I get
>> an OOM error.
>> 
>> hive>
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>   at 
>> org.apache.hadoop.hive.ql.util.jdbm.recman.RecordFile.getNewNode(RecordFile.java:369)
>> 
>> My pageviews is 8GB and my client_ips is ~ 1GB
>> <property>
>> <name>mapred.child.java.opts</name>
>> <value>-Xmx778m</value>
>> </property>
>> 
>> [ecapri...@nyhadoopdata10 ~]$ hive
>> Hive history 
>> file=/tmp/ecapriolo/hive_job_log_ecapriolo_201002181717_253155276.txt
>> hive> explain Select /*+ MAPJOIN( client_ips )*/clientip_id,client_ip,
>> SUM(bytes_sent) as X from pageviews join client_ips on
>> pageviews.clientip_id=client_ips.id  where year=2010 AND month=02 and
>> day=17 group by clientip_id,client_ip
>>> ;
>> OK
>> ABSTRACT SYNTAX TREE:
>> (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF pageviews) (TOK_TABREF
>> client_ips) (= (. (TOK_TABLE_OR_COL pageviews) clientip_id) (.
>> (TOK_TABLE_OR_COL client_ips) id)))) (TOK_INSERT (TOK_DESTINATION
>> (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT
>> TOK_MAPJOIN (TOK_HINTARGLIST client_ips))) (TOK_SELEXPR
>> (TOK_TABLE_OR_COL clientip_id)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>> client_ip)) (TOK_SELEXPR (TOK_FUNCTION SUM (TOK_TABLE_OR_COL
>> bytes_sent)) X)) (TOK_WHERE (and (AND (= (TOK_TABLE_OR_COL year) 2010)
>> (= (TOK_TABLE_OR_COL month) 02)) (= (TOK_TABLE_OR_COL day) 17)))
>> (TOK_GROUPBY (TOK_TABLE_OR_COL clientip_id) (TOK_TABLE_OR_COL
>> client_ip))))
>> 
>> STAGE DEPENDENCIES:
>> Stage-1 is a root stage
>> Stage-2 depends on stages: Stage-1
>> Stage-0 is a root stage
>> 
>> STAGE PLANS:
>> Stage: Stage-1
>>   Map Reduce
>>     Alias -> Map Operator Tree:
>>       pageviews
>>         TableScan
>>           alias: pageviews
>>           Filter Operator
>>             predicate:
>>                 expr: (((UDFToDouble(year) = UDFToDouble(2010)) and
>> (UDFToDouble(month) = UDFToDouble(2))) and (UDFToDouble(day) =
>> UDFToDouble(17)))
>>                 type: boolean
>>             Common Join Operator
>>               condition map:
>>                    Inner Join 0 to 1
>>               condition expressions:
>>                 0 {clientip_id} {bytes_sent} {year} {month} {day}
>>                 1 {client_ip}
>>               keys:
>>                 0
>>                 1
>>               outputColumnNames: _col13, _col17, _col22, _col23,
>> _col24, _col26
>>               Position of Big Table: 0
>>               File Output Operator
>>                 compressed: false
>>                 GlobalTableId: 0
>>                 table:
>>                     input format:
>> org.apache.hadoop.mapred.SequenceFileInputFormat
>>                     output format:
>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>>     Local Work:
>>       Map Reduce Local Work
>>         Alias -> Map Local Tables:
>>           client_ips
>>             Fetch Operator
>>               limit: -1
>>         Alias -> Map Local Operator Tree:
>>           client_ips
>>             TableScan
>>               alias: client_ips
>>               Common Join Operator
>>                 condition map:
>>                      Inner Join 0 to 1
>>                 condition expressions:
>>                   0 {clientip_id} {bytes_sent} {year} {month} {day}
>>                   1 {client_ip}
>>                 keys:
>>                   0
>>                   1
>>                 outputColumnNames: _col13, _col17, _col22, _col23,
>> _col24, _col26
>>                 Position of Big Table: 0
>>                 File Output Operator
>>                   compressed: false
>>                   GlobalTableId: 0
>>                   table:
>>                       input format:
>> org.apache.hadoop.mapred.SequenceFileInputFormat
>>                       output format:
>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>> 
>> Stage: Stage-2
>>   Map Reduce
>>     Alias -> Map Operator Tree:
>>       
>> hdfs://nyhadoopname1.ops.about.com:8020/tmp/hive-ecapriolo/975920219/10002
>>         Select Operator
>>           expressions:
>>                 expr: _col13
>>                 type: int
>>                 expr: _col17
>>                 type: int
>>                 expr: _col22
>>                 type: string
>>                 expr: _col23
>>                 type: string
>>                 expr: _col24
>>                 type: string
>>                 expr: _col26
>>                 type: string
>>           outputColumnNames: _col13, _col17, _col22, _col23, _col24, _col26
>>           Filter Operator
>>             predicate:
>>                 expr: (((UDFToDouble(_col22) = UDFToDouble(2010))
>> and (UDFToDouble(_col23) = UDFToDouble(2))) and (UDFToDouble(_col24) =
>> UDFToDouble(17)))
>>                 type: boolean
>>             Select Operator
>>               expressions:
>>                     expr: _col13
>>                     type: int
>>                     expr: _col26
>>                     type: string
>>                     expr: _col17
>>                     type: int
>>               outputColumnNames: _col13, _col26, _col17
>>               Group By Operator
>>                 aggregations:
>>                       expr: sum(_col17)
>>                 keys:
>>                       expr: _col13
>>                       type: int
>>                       expr: _col26
>>                       type: string
>>                 mode: hash
>>                 outputColumnNames: _col0, _col1, _col2
>>                 Reduce Output Operator
>>                   key expressions:
>>                         expr: _col0
>>                         type: int
>>                         expr: _col1
>>                         type: string
>>                   sort order: ++
>>                   Map-reduce partition columns:
>>                         expr: _col0
>>                         type: int
>>                         expr: _col1
>>                         type: string
>>                   tag: -1
>>                   value expressions:
>>                         expr: _col2
>>                         type: bigint
>>     Reduce Operator Tree:
>>       Group By Operator
>>         aggregations:
>>               expr: sum(VALUE._col0)
>>         keys:
>>               expr: KEY._col0
>>               type: int
>>               expr: KEY._col1
>>               type: string
>>         mode: mergepartial
>>         outputColumnNames: _col0, _col1, _col2
>>         Select Operator
>>           expressions:
>>                 expr: _col0
>>                 type: int
>>                 expr: _col1
>>                 type: string
>>                 expr: _col2
>>                 type: bigint
>>           outputColumnNames: _col0, _col1, _col2
>>           File Output Operator
>>             compressed: false
>>             GlobalTableId: 0
>>             table:
>>                 input format: org.apache.hadoop.mapred.TextInputFormat
>>                 output format:
>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>> 
>> Stage: Stage-0
>>   Fetch Operator
>>     limit: -1
>> 
>> 
>> Time taken: 4.511 seconds
>> 
>> Q: is the 1GB client_ip table too large for a mapjoin?
>> Memory  <value>-Xmx778m</value>. I could go higher. Not sure if i want
>> to may have a cascading affect.
>> Q: is the table in mapjoin all in main memory? Or is this like a small
>> database on each mapper?
>> 
>> Any other hints? Thank you.
>> 
>> 
>> 
>>     ___________________________________________________________
>> 好玩贺卡等你发,邮箱贺卡全新上线!
>> http://card.mail.cn.yahoo.com/
>> 
> 
> Understood. map/join is not possible here. Really 300s is a fine time
> for my query.
> 
> HIVE-917 wont work I do not think. This is a star schema, the bigtable
> needs to be joined with multiple tables so we can not chose one bucket
> that would work for all.
> 
> Has anyone ever considered doing the map-join with derby? This way
> mapjoin is not a main memory operation.

Reply via email to