Dayue Gao created KYLIN-1656:
--------------------------------

             Summary: Improve performance of MRv2 engine by making each mapper 
handles a configured number of records
                 Key: KYLIN-1656
                 URL: https://issues.apache.org/jira/browse/KYLIN-1656
             Project: Kylin
          Issue Type: Improvement
          Components: Job Engine
    Affects Versions: v1.5.1, v1.5.0
            Reporter: Dayue Gao
            Assignee: Dayue Gao


In the current version of MRv2 build engine, each mapper handles one block of 
the flat hive table (stored in sequence file). This has two major problems:

# It's difficult for user to control the parallelism of mappers for each cube.
User can change "dfs.block.size" in kylin_hive_conf.xml, however it's a global 
configuration and cannot be override using "override_kylin_properties" 
introduced in [KYLIN-1534|https://issues.apache.org/jira/browse/KYLIN-1534].
# May encounter mapper execution skew due to a skew distribution of each 
block's records number.
This is a more severe problem since FactDistinctColumn and InMemCubing step of 
MRv2 is very cpu intensive in map task. To give you a sense of how bad it is, 
one of our cube's FactDistinctColumnStep takes ~100min in total with average 
mapper time only 11min. This is because there exists several skewed map tasks 
which handled 10x records than average map task. And the InMemCubing steps 
failed because the skewed mapper tasks hit "mapred.task.timeout".

To avoid skew to happen, *we'd better make each mapper handles a configurable 
number of records instead of handles a sequence file block.* The way we 
achieved this is to add a `RedistributeFlatHiveTableStep` right after 
"FlatHiveTableStep".

Here's what RedistributeFlatHiveTableStep do:
1. we run a {{select count(1) from intermediate_table}} to determine the 
`input_rowcount` of this build

2. we run a {{insert overwrite table intermediate_table select * from 
intermediate_table distribute by rand()}} to evenly distribute records to 
reducers.

The number of reducers is specified as "input_rowcount / mapper_input_rows" 
where `mapper_input_rows` is a new parameter for user to specify how many 
records each mapper should handle. Since each reducer will write out its 
records into one file, we're guaranteed that after 
RedistributeFlatHiveTableStep, each sequence file of FlatHiveTable contains 
around mapper_input_rows. And since the followed up job's mapper handles one 
block of each sequence file, they won't handle more than mapper_input_rows.

The added RedistributeFlatHiveTableStep usually takes a small amount of time 
compared to other steps, but the benefit it brings is remarkable. Here's what 
performance improvement we saw:

|| cube || FactDistinctColumn before || RedistributeFlatHiveTableStep || 
FactDistinctColumn after||
| case#1 | 51.78min | 8.40min | 13.06min |
| case#2 | 95.65min | 2.46min | 26.37min |

And since mapper_input_rows is a kylin configuration, user can override it for 
each cube.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to