2010/2/19 Gang Luo <[email protected]>:
> Hi Yongqiang,
> that sounds interesting.
>
> So, when you mention *hive-chunk*, do you mean *bucket* in database concept? 
> Can I further say that hive is actually doing what database does in a hash 
> join (divide the hash table into several buckets, load all the buckets in 
> turn and join them with a block from outer table)?
>
> If it goes like this, what is the granularity of the block from the outer 
> table? If the block is too small (e.g. one record), the swap between memory 
> and disk will be too frequent. If it is large, it eats too much memory. Do 
> you get a good balance here?
>
> Thanks.
>
> -Gang
>
>
>
> ----- 原始邮件 ----
> 发件人: Yongqiang He <[email protected]>
> 收件人: [email protected]
> 发送日期: 2010/2/19 (周五) 12:39:30 上午
> 主   题: Re: map join and OOM
>
> Actually Hive-917 only help when the joining tables are bucketed.
>
> With hive-trunk (not sure about 0.5), there will not has OOM anymore in
> Hive's mapjoin, no matter how big you table is.
>
> On 2/18/10 3:17 PM, "Edward Capriolo" <[email protected]> wrote:
>
>> 2010/2/18 Gang Luo <[email protected]>:
>>> some personal opinions here.
>>>
>>> the whole table resides in memory. It is stored in a hash table. So, the 
>>> heap
>>> memory should be at least larger than the table size.
>>>
>>> Even you double your heap size. I think the job will possibly fail, for the
>>> hash table in Java is not a memory-efficient data structure (Of course, this
>>> really depend the number of records and the length of each record). I think
>>> Map Join could only handle very small table (100 mb or so).
>>>
>>>  -Gang
>>>
>>>
>>> ----- 原始邮件 ----
>>> 发件人: Edward Capriolo <[email protected]>
>>> 收件人: [email protected]
>>> 发送日期: 2010/2/18 (周四) 5:45:10 下午
>>> 主   题: map join and OOM
>>>
>>> I have Hive 4.1-rc2. My query runs in Time taken: 312.956 seconds
>>> using the map/reduce join. I was interested in using mapjoin, I get
>>> an OOM error.
>>>
>>> hive>
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>    at
>>> org.apache.hadoop.hive.ql.util.jdbm.recman.RecordFile.getNewNode(RecordFile.j
>>> ava:369)
>>>
>>> My pageviews is 8GB and my client_ips is ~ 1GB
>>> <property>
>>> <name>mapred.child.java.opts</name>
>>> <value>-Xmx778m</value>
>>> </property>
>>>
>>> [ecapri...@nyhadoopdata10 ~]$ hive
>>> Hive history
>>> file=/tmp/ecapriolo/hive_job_log_ecapriolo_201002181717_253155276.txt
>>> hive> explain Select /*+ MAPJOIN( client_ips )*/clientip_id,client_ip,
>>> SUM(bytes_sent) as X from pageviews join client_ips on
>>> pageviews.clientip_id=client_ips.id  where year=2010 AND month=02 and
>>> day=17 group by clientip_id,client_ip
>>>> ;
>>> OK
>>> ABSTRACT SYNTAX TREE:
>>>  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF pageviews) (TOK_TABREF
>>> client_ips) (= (. (TOK_TABLE_OR_COL pageviews) clientip_id) (.
>>> (TOK_TABLE_OR_COL client_ips) id)))) (TOK_INSERT (TOK_DESTINATION
>>> (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT
>>> TOK_MAPJOIN (TOK_HINTARGLIST client_ips))) (TOK_SELEXPR
>>> (TOK_TABLE_OR_COL clientip_id)) (TOK_SELEXPR (TOK_TABLE_OR_COL
>>> client_ip)) (TOK_SELEXPR (TOK_FUNCTION SUM (TOK_TABLE_OR_COL
>>> bytes_sent)) X)) (TOK_WHERE (and (AND (= (TOK_TABLE_OR_COL year) 2010)
>>> (= (TOK_TABLE_OR_COL month) 02)) (= (TOK_TABLE_OR_COL day) 17)))
>>> (TOK_GROUPBY (TOK_TABLE_OR_COL clientip_id) (TOK_TABLE_OR_COL
>>> client_ip))))
>>>
>>> STAGE DEPENDENCIES:
>>>  Stage-1 is a root stage
>>>  Stage-2 depends on stages: Stage-1
>>>  Stage-0 is a root stage
>>>
>>> STAGE PLANS:
>>>  Stage: Stage-1
>>>    Map Reduce
>>>      Alias -> Map Operator Tree:
>>>        pageviews
>>>          TableScan
>>>            alias: pageviews
>>>            Filter Operator
>>>              predicate:
>>>                  expr: (((UDFToDouble(year) = UDFToDouble(2010)) and
>>> (UDFToDouble(month) = UDFToDouble(2))) and (UDFToDouble(day) =
>>> UDFToDouble(17)))
>>>                  type: boolean
>>>              Common Join Operator
>>>                condition map:
>>>                     Inner Join 0 to 1
>>>                condition expressions:
>>>                  0 {clientip_id} {bytes_sent} {year} {month} {day}
>>>                  1 {client_ip}
>>>                keys:
>>>                  0
>>>                  1
>>>                outputColumnNames: _col13, _col17, _col22, _col23,
>>> _col24, _col26
>>>                Position of Big Table: 0
>>>                File Output Operator
>>>                  compressed: false
>>>                  GlobalTableId: 0
>>>                  table:
>>>                      input format:
>>> org.apache.hadoop.mapred.SequenceFileInputFormat
>>>                      output format:
>>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>>>      Local Work:
>>>        Map Reduce Local Work
>>>          Alias -> Map Local Tables:
>>>            client_ips
>>>              Fetch Operator
>>>                limit: -1
>>>          Alias -> Map Local Operator Tree:
>>>            client_ips
>>>              TableScan
>>>                alias: client_ips
>>>                Common Join Operator
>>>                  condition map:
>>>                       Inner Join 0 to 1
>>>                  condition expressions:
>>>                    0 {clientip_id} {bytes_sent} {year} {month} {day}
>>>                    1 {client_ip}
>>>                  keys:
>>>                    0
>>>                    1
>>>                  outputColumnNames: _col13, _col17, _col22, _col23,
>>> _col24, _col26
>>>                  Position of Big Table: 0
>>>                  File Output Operator
>>>                    compressed: false
>>>                    GlobalTableId: 0
>>>                    table:
>>>                        input format:
>>> org.apache.hadoop.mapred.SequenceFileInputFormat
>>>                        output format:
>>> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>>>
>>>  Stage: Stage-2
>>>    Map Reduce
>>>      Alias -> Map Operator Tree:
>>>
>>> hdfs://nyhadoopname1.ops.about.com:8020/tmp/hive-ecapriolo/975920219/10002
>>>          Select Operator
>>>            expressions:
>>>                  expr: _col13
>>>                  type: int
>>>                  expr: _col17
>>>                  type: int
>>>                  expr: _col22
>>>                  type: string
>>>                  expr: _col23
>>>                  type: string
>>>                  expr: _col24
>>>                  type: string
>>>                  expr: _col26
>>>                  type: string
>>>            outputColumnNames: _col13, _col17, _col22, _col23, _col24, _col26
>>>            Filter Operator
>>>              predicate:
>>>                  expr: (((UDFToDouble(_col22) = UDFToDouble(2010))
>>> and (UDFToDouble(_col23) = UDFToDouble(2))) and (UDFToDouble(_col24) =
>>> UDFToDouble(17)))
>>>                  type: boolean
>>>              Select Operator
>>>                expressions:
>>>                      expr: _col13
>>>                      type: int
>>>                      expr: _col26
>>>                      type: string
>>>                      expr: _col17
>>>                      type: int
>>>                outputColumnNames: _col13, _col26, _col17
>>>                Group By Operator
>>>                  aggregations:
>>>                        expr: sum(_col17)
>>>                  keys:
>>>                        expr: _col13
>>>                        type: int
>>>                        expr: _col26
>>>                        type: string
>>>                  mode: hash
>>>                  outputColumnNames: _col0, _col1, _col2
>>>                  Reduce Output Operator
>>>                    key expressions:
>>>                          expr: _col0
>>>                          type: int
>>>                          expr: _col1
>>>                          type: string
>>>                    sort order: ++
>>>                    Map-reduce partition columns:
>>>                          expr: _col0
>>>                          type: int
>>>                          expr: _col1
>>>                          type: string
>>>                    tag: -1
>>>                    value expressions:
>>>                          expr: _col2
>>>                          type: bigint
>>>      Reduce Operator Tree:
>>>        Group By Operator
>>>          aggregations:
>>>                expr: sum(VALUE._col0)
>>>          keys:
>>>                expr: KEY._col0
>>>                type: int
>>>                expr: KEY._col1
>>>                type: string
>>>          mode: mergepartial
>>>          outputColumnNames: _col0, _col1, _col2
>>>          Select Operator
>>>            expressions:
>>>                  expr: _col0
>>>                  type: int
>>>                  expr: _col1
>>>                  type: string
>>>                  expr: _col2
>>>                  type: bigint
>>>            outputColumnNames: _col0, _col1, _col2
>>>            File Output Operator
>>>              compressed: false
>>>              GlobalTableId: 0
>>>              table:
>>>                  input format: org.apache.hadoop.mapred.TextInputFormat
>>>                  output format:
>>> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>>>
>>>  Stage: Stage-0
>>>    Fetch Operator
>>>      limit: -1
>>>
>>>
>>> Time taken: 4.511 seconds
>>>
>>> Q: is the 1GB client_ip table too large for a mapjoin?
>>> Memory  <value>-Xmx778m</value>. I could go higher. Not sure if i want
>>> to may have a cascading affect.
>>> Q: is the table in mapjoin all in main memory? Or is this like a small
>>> database on each mapper?
>>>
>>> Any other hints? Thank you.
>>>
>>>
>>>
>>>      ___________________________________________________________
>>>  好玩贺卡等你发,邮箱贺卡全新上线!
>>> http://card.mail.cn.yahoo.com/
>>>
>>
>> Understood. map/join is not possible here. Really 300s is a fine time
>> for my query.
>>
>> HIVE-917 wont work I do not think. This is a star schema, the bigtable
>> needs to be joined with multiple tables so we can not chose one bucket
>> that would work for all.
>>
>> Has anyone ever considered doing the map-join with derby? This way
>> mapjoin is not a main memory operation.
>>
>>
>
>
>      ___________________________________________________________
>  好玩贺卡等你发,邮箱贺卡全新上线!
> http://card.mail.cn.yahoo.com/
>

@Ning

>>Map-join works with persistent data structure staring from branch 0.5. We 
>>will keep a cache >>using HashMap in main memory and you can specify the # of 
>>rows to be put in the cache. If it >>is too large, data will be spilled to 
>>disk-based hash table.

So I have a problem, I am trying to join 5 large tables. My pageview
table is partitioned by day ~6GB but I have 4 other tables that can be
about 1GB each.

Currently joining the 5 tables throws OOM.

I see only two ways I can solve this.
1) break my joins up into stages, each stage is written to a hive table

@Ning. or anyone else

With the enhancements to mapjoin in 5.0, will my 5 table join work
(theoretically) if I use mapjoin? Speed is not really an issue here,
but if joining 2+ tables is not practical in most cases I need to
start planning for this now.

Thanks

Reply via email to