Repository: incubator-systemml Updated Branches: refs/heads/master 2bf61b476 -> cd5499c54
[SYSTEMML-1543] Fix parfor spark result merge w/ many result files So far, the parfor spark result merge essentially created RDDs for all results files, concatenated them via union into one RDD and finally executed the actual merge operation. For many parfor tasks, i.e., many result files, this led to stack overflow errors because the lineage for binary union operations was too large. We now construct a single job configuration with all filenames and hence a single input RDD for result merge which exhibits much better scalability. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cd5499c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cd5499c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cd5499c5 Branch: refs/heads/master Commit: cd5499c54895c8745884cd2c9d2476f3df46606a Parents: 2bf61b4 Author: Matthias Boehm <[email protected]> Authored: Tue Apr 18 22:09:49 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Apr 18 22:09:49 2017 -0700 ---------------------------------------------------------------------- .../parfor/ResultMergeRemoteMR.java | 3 +- .../parfor/ResultMergeRemoteSpark.java | 108 ++++++++++--------- 2 files changed, 61 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cd5499c5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java index b285a13..98a063e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java @@ -175,8 +175,7 @@ public class ResultMergeRemoteMR extends ResultMerge String jobname = "ParFor-RMMR"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; - JobConf job; - job = new JobConf( ResultMergeRemoteMR.class ); + JobConf job = new JobConf( ResultMergeRemoteMR.class ); job.setJobName(jobname+_pfid); //maintain dml script counters http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cd5499c5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index 49293e4..d783977 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -20,6 +20,9 @@ package org.apache.sysml.runtime.controlprogram.parfor; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.sysml.api.DMLScript; @@ -30,6 +33,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.instructions.spark.data.RDDObject; +import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction; import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixFormatMetaData; @@ -127,73 +131,81 @@ public class ResultMergeRemoteSpark extends ResultMerge RDDObject ret = null; - //determine degree of parallelism + //determine degree of parallelism int numRed = (int)determineNumReducers(rlen, clen, brlen, bclen, _numReducers); - + //sanity check for empty src files if( inputs == null || inputs.length==0 ) throw new DMLRuntimeException("Execute merge should never be called with no inputs."); try { - //Step 1: union over all results - JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) - sec.getRDDHandleForMatrixObject(_inputs[0], InputInfo.BinaryBlockInputInfo); - for( int i=1; i<_inputs.length; i++ ) { - JavaPairRDD<MatrixIndexes, MatrixBlock> rdd2 = (JavaPairRDD<MatrixIndexes, MatrixBlock>) - sec.getRDDHandleForMatrixObject(_inputs[i], InputInfo.BinaryBlockInputInfo); - rdd = rdd.union(rdd2); - } - - //Step 2a: merge with compare - JavaPairRDD<MatrixIndexes, MatrixBlock> out = null; - if( withCompare ) - { - JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) - sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo); - - //merge values which differ from compare values - ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare(); - out = rdd.groupByKey(numRed) //group all result blocks per key - .join(compareRdd) //join compare block and result blocks - .mapToPair(cfun); //merge result blocks w/ compare - } - //Step 2b: merge without compare - else - { - //direct merge in any order (disjointness guaranteed) - out = RDDAggregateUtils.mergeByKey(rdd, false); - } + //note: initial implementation via union over all result rdds discarded due to + //stack overflow errors with many parfor tasks, and thus many rdds + + //Step 1: construct input rdd from all result files of parfor workers + //a) construct job conf with all files + InputInfo ii = InputInfo.BinaryBlockInputInfo; + JobConf job = new JobConf( ResultMergeRemoteMR.class ); + job.setJobName(jobname); + job.setInputFormat(ii.inputFormatClass); + Path[] paths = new Path[ inputs.length ]; + for(int i=0; i<paths.length; i++) { + //ensure presence of hdfs if inputs come from memory + if( inputs[i].isDirty() ) + inputs[i].exportData(); + paths[i] = new Path( inputs[i].getFileName() ); + } + FileInputFormat.setInputPaths(job, paths); + + //b) create rdd from input files w/ deep copy of keys and blocks + JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = sec.getSparkContext() + .hadoopRDD(job, ii.inputFormatClass, ii.inputKeyClass, ii.inputValueClass) + .mapPartitionsToPair(new CopyBlockPairFunction(true), true); + + //Step 2a: merge with compare + JavaPairRDD<MatrixIndexes, MatrixBlock> out = null; + if( withCompare ) + { + JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) + sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo); + + //merge values which differ from compare values + ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare(); + out = rdd.groupByKey(numRed) //group all result blocks per key + .join(compareRdd) //join compare block and result blocks + .mapToPair(cfun); //merge result blocks w/ compare + } + //Step 2b: merge without compare + else { + //direct merge in any order (disjointness guaranteed) + out = RDDAggregateUtils.mergeByKey(rdd, false); + } - //Step 3: create output rdd handle w/ lineage - ret = new RDDObject(out, varname); - for( int i=0; i<_inputs.length; i++ ) { - //child rdd handles guaranteed to exist - RDDObject child = _inputs[i].getRDDHandle(); - ret.addLineageChild(child); - } + //Step 3: create output rdd handle w/ lineage + ret = new RDDObject(out, varname); + if( withCompare ) + ret.addLineageChild(compare.getRDDHandle()); } - catch( Exception ex ) - { + catch( Exception ex ) { throw new DMLRuntimeException(ex); } //maintain statistics - Statistics.incrementNoOfCompiledSPInst(); - Statistics.incrementNoOfExecutedSPInst(); - if( DMLScript.STATISTICS ){ + Statistics.incrementNoOfCompiledSPInst(); + Statistics.incrementNoOfExecutedSPInst(); + if( DMLScript.STATISTICS ){ Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0); } - + return ret; } - private int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed) - { + private int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed) { //set the number of mappers and reducers - long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1); + long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1); int ret = (int)Math.min( numRed, reducerGroups ); - - return ret; + + return ret; } }
