Dayue Gao created KYLIN-1656:
--------------------------------
Summary: Improve performance of MRv2 engine by making each mapper
handles a configured number of records
Key: KYLIN-1656
URL: https://issues.apache.org/jira/browse/KYLIN-1656
Project: Kylin
Issue Type: Improvement
Components: Job Engine
Affects Versions: v1.5.1, v1.5.0
Reporter: Dayue Gao
Assignee: Dayue Gao
In the current version of MRv2 build engine, each mapper handles one block of
the flat hive table (stored in sequence file). This has two major problems:
# It's difficult for user to control the parallelism of mappers for each cube.
User can change "dfs.block.size" in kylin_hive_conf.xml, however it's a global
configuration and cannot be override using "override_kylin_properties"
introduced in [KYLIN-1534|https://issues.apache.org/jira/browse/KYLIN-1534].
# May encounter mapper execution skew due to a skew distribution of each
block's records number.
This is a more severe problem since FactDistinctColumn and InMemCubing step of
MRv2 is very cpu intensive in map task. To give you a sense of how bad it is,
one of our cube's FactDistinctColumnStep takes ~100min in total with average
mapper time only 11min. This is because there exists several skewed map tasks
which handled 10x records than average map task. And the InMemCubing steps
failed because the skewed mapper tasks hit "mapred.task.timeout".
To avoid skew to happen, *we'd better make each mapper handles a configurable
number of records instead of handles a sequence file block.* The way we
achieved this is to add a `RedistributeFlatHiveTableStep` right after
"FlatHiveTableStep".
Here's what RedistributeFlatHiveTableStep do:
1. we run a {{select count(1) from intermediate_table}} to determine the
`input_rowcount` of this build
2. we run a {{insert overwrite table intermediate_table select * from
intermediate_table distribute by rand()}} to evenly distribute records to
reducers.
The number of reducers is specified as "input_rowcount / mapper_input_rows"
where `mapper_input_rows` is a new parameter for user to specify how many
records each mapper should handle. Since each reducer will write out its
records into one file, we're guaranteed that after
RedistributeFlatHiveTableStep, each sequence file of FlatHiveTable contains
around mapper_input_rows. And since the followed up job's mapper handles one
block of each sequence file, they won't handle more than mapper_input_rows.
The added RedistributeFlatHiveTableStep usually takes a small amount of time
compared to other steps, but the benefit it brings is remarkable. Here's what
performance improvement we saw:
|| cube || FactDistinctColumn before || RedistributeFlatHiveTableStep ||
FactDistinctColumn after||
| case#1 | 51.78min | 8.40min | 13.06min |
| case#2 | 95.65min | 2.46min | 26.37min |
And since mapper_input_rows is a kylin configuration, user can override it for
each cube.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)