[
https://issues.apache.org/jira/browse/KYLIN-1656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shaofeng SHI resolved KYLIN-1656.
---------------------------------
Resolution: Fixed
> Improve performance of MRv2 engine by making each mapper handles a configured
> number of records
> -----------------------------------------------------------------------------------------------
>
> Key: KYLIN-1656
> URL: https://issues.apache.org/jira/browse/KYLIN-1656
> Project: Kylin
> Issue Type: Improvement
> Components: Job Engine
> Affects Versions: v1.5.0, v1.5.1
> Reporter: Dayue Gao
> Assignee: Dayue Gao
> Fix For: v1.5.3
>
> Attachments: KYLIN-1656.patch
>
>
> In the current version of MRv2 build engine, each mapper handles one block of
> the flat hive table (stored in sequence file). This has two major problems:
> # It's difficult for user to control the parallelism of mappers for each cube.
> User can change "dfs.block.size" in kylin_hive_conf.xml, however it's a
> global configuration and cannot be override using "override_kylin_properties"
> introduced in [KYLIN-1534|https://issues.apache.org/jira/browse/KYLIN-1534].
> # May encounter mapper execution skew due to a skew distribution of each
> block's records number.
> This is a more severe problem since FactDistinctColumn and InMemCubing step
> of MRv2 is very cpu intensive in map task. To give you a sense of how bad it
> is, one of our cube's FactDistinctColumnStep takes ~100min in total with
> average mapper time only 11min. This is because there exists several skewed
> map tasks which handled 10x records than average map task. And the
> InMemCubing steps failed because the skewed mapper tasks hit
> "mapred.task.timeout".
> To avoid skew to happen, *we'd better make each mapper handles a configurable
> number of records instead of handles a sequence file block.* The way we
> achieved this is to add a `RedistributeFlatHiveTableStep` right after
> "FlatHiveTableStep".
> Here's what RedistributeFlatHiveTableStep do:
> 1. we run a {{select count(1) from intermediate_table}} to determine the
> `input_rowcount` of this build
> 2. we run a {{insert overwrite table intermediate_table select * from
> intermediate_table distribute by rand()}} to evenly distribute records to
> reducers.
> The number of reducers is specified as "input_rowcount / mapper_input_rows"
> where `mapper_input_rows` is a new parameter for user to specify how many
> records each mapper should handle. Since each reducer will write out its
> records into one file, we're guaranteed that after
> RedistributeFlatHiveTableStep, each sequence file of FlatHiveTable contains
> around mapper_input_rows. And since the followed up job's mapper handles one
> block of each sequence file, they won't handle more than mapper_input_rows.
> The added RedistributeFlatHiveTableStep usually takes a small amount of time
> compared to other steps, but the benefit it brings is remarkable. Here's what
> performance improvement we saw:
> || cube || FactDistinctColumn before || RedistributeFlatHiveTableStep ||
> FactDistinctColumn after||
> | case#1 | 51.78min | 8.40min | 13.06min |
> | case#2 | 95.65min | 2.46min | 26.37min |
> And since mapper_input_rows is a kylin configuration, user can override it
> for each cube.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)