Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2bf61b476 -> cd5499c54


[SYSTEMML-1543] Fix parfor spark result merge w/ many result files

So far, the parfor spark result merge essentially created RDDs for all
results files, concatenated them via union into one RDD and finally
executed the actual merge operation. For many parfor tasks, i.e., many
result files, this led to stack overflow errors because the lineage for
binary union operations was too large. We now construct a single job
configuration with all filenames and hence a single input RDD for result
merge which exhibits much better scalability. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cd5499c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cd5499c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cd5499c5

Branch: refs/heads/master
Commit: cd5499c54895c8745884cd2c9d2476f3df46606a
Parents: 2bf61b4
Author: Matthias Boehm <[email protected]>
Authored: Tue Apr 18 22:09:49 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Apr 18 22:09:49 2017 -0700

----------------------------------------------------------------------
 .../parfor/ResultMergeRemoteMR.java             |   3 +-
 .../parfor/ResultMergeRemoteSpark.java          | 108 ++++++++++---------
 2 files changed, 61 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cd5499c5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index b285a13..98a063e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -175,8 +175,7 @@ public class ResultMergeRemoteMR extends ResultMerge
                String jobname = "ParFor-RMMR";
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                
-               JobConf job;
-               job = new JobConf( ResultMergeRemoteMR.class );
+               JobConf job = new JobConf( ResultMergeRemoteMR.class );
                job.setJobName(jobname+_pfid);
 
                //maintain dml script counters

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cd5499c5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index 49293e4..d783977 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -20,6 +20,9 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
 
 import org.apache.sysml.api.DMLScript;
@@ -30,6 +33,7 @@ import 
org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
+import 
org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
@@ -127,73 +131,81 @@ public class ResultMergeRemoteSpark extends ResultMerge
 
                RDDObject ret = null;
                
-           //determine degree of parallelism
+               //determine degree of parallelism
                int numRed = (int)determineNumReducers(rlen, clen, brlen, 
bclen, _numReducers);
-       
+               
                //sanity check for empty src files
                if( inputs == null || inputs.length==0  )
                        throw new DMLRuntimeException("Execute merge should 
never be called with no inputs.");
                
                try
                {
-                   //Step 1: union over all results
-                   JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-                               sec.getRDDHandleForMatrixObject(_inputs[0], 
InputInfo.BinaryBlockInputInfo);
-                   for( int i=1; i<_inputs.length; i++ ) {
-                           JavaPairRDD<MatrixIndexes, MatrixBlock> rdd2 = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-                                       
sec.getRDDHandleForMatrixObject(_inputs[i], InputInfo.BinaryBlockInputInfo);
-                           rdd = rdd.union(rdd2);
-                   }
-               
-                   //Step 2a: merge with compare
-                   JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
-                   if( withCompare )
-                   {
-                       JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = 
(JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-                                       
sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
-                           
-                       //merge values which differ from compare values
-                       ResultMergeRemoteSparkWCompare cfun = new 
ResultMergeRemoteSparkWCompare();
-                       out = rdd.groupByKey(numRed) //group all result blocks 
per key
-                                .join(compareRdd)   //join compare block and 
result blocks 
-                                .mapToPair(cfun);   //merge result blocks w/ 
compare
-                   }
-                   //Step 2b: merge without compare
-                   else
-                   {
-                       //direct merge in any order (disjointness guaranteed)
-                       out = RDDAggregateUtils.mergeByKey(rdd, false);
-                   }
+                       //note: initial implementation via union over all 
result rdds discarded due to 
+                       //stack overflow errors with many parfor tasks, and 
thus many rdds
+                       
+                       //Step 1: construct input rdd from all result files of 
parfor workers
+                       //a) construct job conf with all files
+                       InputInfo ii = InputInfo.BinaryBlockInputInfo;
+                       JobConf job = new JobConf( ResultMergeRemoteMR.class );
+                       job.setJobName(jobname);
+                       job.setInputFormat(ii.inputFormatClass);
+                       Path[] paths = new Path[ inputs.length ];
+                       for(int i=0; i<paths.length; i++) {
+                               //ensure presence of hdfs if inputs come from 
memory
+                               if( inputs[i].isDirty() )
+                                       inputs[i].exportData();
+                               paths[i] = new Path( inputs[i].getFileName() );
+                       }
+                       FileInputFormat.setInputPaths(job, paths);
+                       
+                       //b) create rdd from input files w/ deep copy of keys 
and blocks
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = 
sec.getSparkContext()
+                                       .hadoopRDD(job, ii.inputFormatClass, 
ii.inputKeyClass, ii.inputValueClass)
+                                       .mapPartitionsToPair(new 
CopyBlockPairFunction(true), true);
+                       
+                       //Step 2a: merge with compare
+                       JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
+                       if( withCompare )
+                       {
+                               JavaPairRDD<MatrixIndexes, MatrixBlock> 
compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
+                                               
sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
+                       
+                               //merge values which differ from compare values
+                               ResultMergeRemoteSparkWCompare cfun = new 
ResultMergeRemoteSparkWCompare();
+                               out = rdd.groupByKey(numRed) //group all result 
blocks per key
+                                       .join(compareRdd)   //join compare 
block and result blocks 
+                                       .mapToPair(cfun);   //merge result 
blocks w/ compare
+                       }
+                       //Step 2b: merge without compare
+                       else {
+                               //direct merge in any order (disjointness 
guaranteed)
+                               out = RDDAggregateUtils.mergeByKey(rdd, false);
+                       }
                    
-                   //Step 3: create output rdd handle w/ lineage
-                   ret = new RDDObject(out, varname);
-                   for( int i=0; i<_inputs.length; i++ ) {
-                       //child rdd handles guaranteed to exist
-                       RDDObject child = _inputs[i].getRDDHandle();
-                               ret.addLineageChild(child);
-                   }
+                       //Step 3: create output rdd handle w/ lineage
+                       ret = new RDDObject(out, varname);
+                       if( withCompare )
+                               ret.addLineageChild(compare.getRDDHandle());
                }
-               catch( Exception ex )
-               {
+               catch( Exception ex ) {
                        throw new DMLRuntimeException(ex);
                }           
                
                //maintain statistics
-           Statistics.incrementNoOfCompiledSPInst();
-           Statistics.incrementNoOfExecutedSPInst();
-           if( DMLScript.STATISTICS ){
+               Statistics.incrementNoOfCompiledSPInst();
+               Statistics.incrementNoOfExecutedSPInst();
+               if( DMLScript.STATISTICS ){
                        Statistics.maintainCPHeavyHitters(jobname, 
System.nanoTime()-t0);
                }
-           
+               
                return ret;
        }
 
-       private int determineNumReducers(long rlen, long clen, int brlen, int 
bclen, long numRed)
-       {
+       private int determineNumReducers(long rlen, long clen, int brlen, int 
bclen, long numRed) {
                //set the number of mappers and reducers 
-           long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 
1);
+               long reducerGroups = Math.max(rlen/brlen,1) * 
Math.max(clen/bclen, 1);
                int ret = (int)Math.min( numRed, reducerGroups );
-           
-           return ret;         
+               
+               return ret;     
        }
 }

Reply via email to