Performance mr iqm/quantile/median (qsort num red, qpick buffer size)

Our mr iqm/quantile/median realization via qsort and qpick showed severe
scalability bottlenecks on large data. The root cause was that the
out-of-core qpick reads -- according to the required quantiles -- one or
two partitions into CP, where the partition sizes are determined by the
number of reducers of SortMR. This performance patch makes two
improvements: (1) setting the number of reducers for SortMR (except for
order) such that a single partition has at most 10M records (~128MB)
which bounds the read for qpick, and (2) a larger io buffer size of 64KB
for qpick which makes the partition read slightly faster compared to the
4KB default (~20%).  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/7f8716b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/7f8716b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/7f8716b6

Branch: refs/heads/master
Commit: 7f8716b641a45dbc77713710979cbf331d89ae8a
Parents: 240143b
Author: Matthias Boehm <[email protected]>
Authored: Sun Feb 14 01:53:42 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Sun Feb 14 01:53:42 2016 -0800

----------------------------------------------------------------------
 .../org/apache/sysml/runtime/matrix/SortMR.java     | 16 +++++++++++-----
 .../apache/sysml/runtime/util/MapReduceTool.java    |  3 ++-
 2 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java 
b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
index 218a95b..6502e57 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/SortMR.java
@@ -196,13 +196,19 @@ public class SortMR
            Path outpath = new Path(tmpOutput);
            FileOutputFormat.setOutputPath(job, outpath);           
            MapReduceTool.deleteFileIfExistOnHDFS(outpath, job);
-           
+
            //set number of reducers (1 if local mode)
-           if( InfrastructureAnalyzer.isLocalMode(job) )
-               job.setNumReduceTasks(1);
-           else
+               if( !InfrastructureAnalyzer.isLocalMode(job) ) {
                MRJobConfiguration.setNumReducers(job, numReducers, 
numReducers);
-           
+               //ensure partition size <= 10M records to avoid scalability 
bottlenecks
+               //on cp-side qpick instructions for quantile/iqm/median (~128MB)
+               if( 
!(getSortInstructionType(sortInst)==SortKeys.OperationTypes.Indexes) )
+                       
job.setNumReduceTasks((int)Math.max(job.getNumReduceTasks(), rlen/10000000));
+           }
+           else //in case of local mode
+               job.setNumReduceTasks(1);
+               
+               
            //setup input/output format
            job.setInputFormat(SamplingSortMRInputFormat.class);
            SamplingSortMRInputFormat.setTargetKeyValueClasses(job, (Class<? 
extends WritableComparable>) outputInfo.outputKeyClass, 
outputInfo.outputValueClass);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/7f8716b6/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index a142a2d..9b70106 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -620,7 +620,8 @@ public class MapReduceTool
                if(fileToRead==null)
                        throw new RuntimeException("cannot read partition 
"+currentPart);
                
-               FSDataInputStream currentStream=fs.open(fileToRead);
+               int buffsz = 64 * 1024;
+               FSDataInputStream currentStream=fs.open(fileToRead, buffsz);
            DoubleWritable readKey=new DoubleWritable();
            IntWritable readValue=new IntWritable();
            

Reply via email to