Performance mr iqm/quantile/median (qsort num red, qpick buffer size) Our mr iqm/quantile/median realization via qsort and qpick showed severe scalability bottlenecks on large data. The root cause was that the out-of-core qpick reads -- according to the required quantiles -- one or two partitions into CP, where the partition sizes are determined by the number of reducers of SortMR. This performance patch makes two improvements: (1) setting the number of reducers for SortMR (except for order) such that a single partition has at most 10M records (~128MB) which bounds the read for qpick, and (2) a larger io buffer size of 64KB for qpick which makes the partition read slightly faster compared to the 4KB default (~20%).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/7f8716b6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/7f8716b6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/7f8716b6 Branch: refs/heads/master Commit: 7f8716b641a45dbc77713710979cbf331d89ae8a Parents: 240143b Author: Matthias Boehm <[email protected]> Authored: Sun Feb 14 01:53:42 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sun Feb 14 01:53:42 2016 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/runtime/matrix/SortMR.java | 16 +++++++++++----- .../apache/sysml/runtime/util/MapReduceTool.java | 3 ++- 2 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java index 218a95b..6502e57 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java @@ -196,13 +196,19 @@ public class SortMR Path outpath = new Path(tmpOutput); FileOutputFormat.setOutputPath(job, outpath); MapReduceTool.deleteFileIfExistOnHDFS(outpath, job); - + //set number of reducers (1 if local mode) - if( InfrastructureAnalyzer.isLocalMode(job) ) - job.setNumReduceTasks(1); - else + if( !InfrastructureAnalyzer.isLocalMode(job) ) { MRJobConfiguration.setNumReducers(job, numReducers, numReducers); - + //ensure partition size <= 10M records to avoid scalability bottlenecks + //on cp-side qpick instructions for quantile/iqm/median (~128MB) + if( !(getSortInstructionType(sortInst)==SortKeys.OperationTypes.Indexes) ) + job.setNumReduceTasks((int)Math.max(job.getNumReduceTasks(), rlen/10000000)); + } + else //in case of local mode + job.setNumReduceTasks(1); + + //setup input/output format job.setInputFormat(SamplingSortMRInputFormat.class); SamplingSortMRInputFormat.setTargetKeyValueClasses(job, (Class<? extends WritableComparable>) outputInfo.outputKeyClass, outputInfo.outputValueClass); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java index a142a2d..9b70106 100644 --- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java +++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java @@ -620,7 +620,8 @@ public class MapReduceTool if(fileToRead==null) throw new RuntimeException("cannot read partition "+currentPart); - FSDataInputStream currentStream=fs.open(fileToRead); + int buffsz = 64 * 1024; + FSDataInputStream currentStream=fs.open(fileToRead, buffsz); DoubleWritable readKey=new DoubleWritable(); IntWritable readValue=new IntWritable();
