1GB of the small table is usually too large for map-side joins. If the raw data
is 1GB, it could be 10x larger when it is read into main memory as Java
objects. Our default value is 10MB.
Another factor to determine whether to use map-side join is the number of rows
in the small table. If it is too large, each mapper will spend long time to
process the join (each mapper reads the whole small table into a hash table in
main memory and joins a split of the large table).
Thanks,
Ning
On Feb 18, 2010, at 2:45 PM, Edward Capriolo wrote:
> I have Hive 4.1-rc2. My query runs in Time taken: 312.956 seconds
> using the map/reduce join. I was interested in using mapjoin, I get
> an OOM error.
>
> hive>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> org.apache.hadoop.hive.ql.util.jdbm.recman.RecordFile.getNewNode(RecordFile.java:369)
>
> My pageviews is 8GB and my client_ips is ~ 1GB
> <property>
> <name>mapred.child.java.opts</name>
> <value>-Xmx778m</value>
> </property>
>
> [ecapri...@nyhadoopdata10 ~]$ hive
> Hive history
> file=/tmp/ecapriolo/hive_job_log_ecapriolo_201002181717_253155276.txt
> hive> explain Select /*+ MAPJOIN( client_ips )*/clientip_id,client_ip,
> SUM(bytes_sent) as X from pageviews join client_ips on
> pageviews.clientip_id=client_ips.id where year=2010 AND month=02 and
> day=17 group by clientip_id,client_ip
>> ;
> OK
> ABSTRACT SYNTAX TREE:
> (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF pageviews) (TOK_TABREF
> client_ips) (= (. (TOK_TABLE_OR_COL pageviews) clientip_id) (.
> (TOK_TABLE_OR_COL client_ips) id)))) (TOK_INSERT (TOK_DESTINATION
> (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT
> TOK_MAPJOIN (TOK_HINTARGLIST client_ips))) (TOK_SELEXPR
> (TOK_TABLE_OR_COL clientip_id)) (TOK_SELEXPR (TOK_TABLE_OR_COL
> client_ip)) (TOK_SELEXPR (TOK_FUNCTION SUM (TOK_TABLE_OR_COL
> bytes_sent)) X)) (TOK_WHERE (and (AND (= (TOK_TABLE_OR_COL year) 2010)
> (= (TOK_TABLE_OR_COL month) 02)) (= (TOK_TABLE_OR_COL day) 17)))
> (TOK_GROUPBY (TOK_TABLE_OR_COL clientip_id) (TOK_TABLE_OR_COL
> client_ip))))
>
> STAGE DEPENDENCIES:
> Stage-1 is a root stage
> Stage-2 depends on stages: Stage-1
> Stage-0 is a root stage
>
> STAGE PLANS:
> Stage: Stage-1
> Map Reduce
> Alias -> Map Operator Tree:
> pageviews
> TableScan
> alias: pageviews
> Filter Operator
> predicate:
> expr: (((UDFToDouble(year) = UDFToDouble(2010)) and
> (UDFToDouble(month) = UDFToDouble(2))) and (UDFToDouble(day) =
> UDFToDouble(17)))
> type: boolean
> Common Join Operator
> condition map:
> Inner Join 0 to 1
> condition expressions:
> 0 {clientip_id} {bytes_sent} {year} {month} {day}
> 1 {client_ip}
> keys:
> 0
> 1
> outputColumnNames: _col13, _col17, _col22, _col23,
> _col24, _col26
> Position of Big Table: 0
> File Output Operator
> compressed: false
> GlobalTableId: 0
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> Local Work:
> Map Reduce Local Work
> Alias -> Map Local Tables:
> client_ips
> Fetch Operator
> limit: -1
> Alias -> Map Local Operator Tree:
> client_ips
> TableScan
> alias: client_ips
> Common Join Operator
> condition map:
> Inner Join 0 to 1
> condition expressions:
> 0 {clientip_id} {bytes_sent} {year} {month} {day}
> 1 {client_ip}
> keys:
> 0
> 1
> outputColumnNames: _col13, _col17, _col22, _col23,
> _col24, _col26
> Position of Big Table: 0
> File Output Operator
> compressed: false
> GlobalTableId: 0
> table:
> input format:
> org.apache.hadoop.mapred.SequenceFileInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
>
> Stage: Stage-2
> Map Reduce
> Alias -> Map Operator Tree:
>
> hdfs://nyhadoopname1.ops.about.com:8020/tmp/hive-ecapriolo/975920219/10002
> Select Operator
> expressions:
> expr: _col13
> type: int
> expr: _col17
> type: int
> expr: _col22
> type: string
> expr: _col23
> type: string
> expr: _col24
> type: string
> expr: _col26
> type: string
> outputColumnNames: _col13, _col17, _col22, _col23, _col24, _col26
> Filter Operator
> predicate:
> expr: (((UDFToDouble(_col22) = UDFToDouble(2010))
> and (UDFToDouble(_col23) = UDFToDouble(2))) and (UDFToDouble(_col24) =
> UDFToDouble(17)))
> type: boolean
> Select Operator
> expressions:
> expr: _col13
> type: int
> expr: _col26
> type: string
> expr: _col17
> type: int
> outputColumnNames: _col13, _col26, _col17
> Group By Operator
> aggregations:
> expr: sum(_col17)
> keys:
> expr: _col13
> type: int
> expr: _col26
> type: string
> mode: hash
> outputColumnNames: _col0, _col1, _col2
> Reduce Output Operator
> key expressions:
> expr: _col0
> type: int
> expr: _col1
> type: string
> sort order: ++
> Map-reduce partition columns:
> expr: _col0
> type: int
> expr: _col1
> type: string
> tag: -1
> value expressions:
> expr: _col2
> type: bigint
> Reduce Operator Tree:
> Group By Operator
> aggregations:
> expr: sum(VALUE._col0)
> keys:
> expr: KEY._col0
> type: int
> expr: KEY._col1
> type: string
> mode: mergepartial
> outputColumnNames: _col0, _col1, _col2
> Select Operator
> expressions:
> expr: _col0
> type: int
> expr: _col1
> type: string
> expr: _col2
> type: bigint
> outputColumnNames: _col0, _col1, _col2
> File Output Operator
> compressed: false
> GlobalTableId: 0
> table:
> input format: org.apache.hadoop.mapred.TextInputFormat
> output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>
> Stage: Stage-0
> Fetch Operator
> limit: -1
>
>
> Time taken: 4.511 seconds
>
> Q: is the 1GB client_ip table too large for a mapjoin?
> Memory <value>-Xmx778m</value>. I could go higher. Not sure if i want
> to may have a cascading affect.
> Q: is the table in mapjoin all in main memory? Or is this like a small
> database on each mapper?
>
> Any other hints? Thank you.