Repository: incubator-systemml Updated Branches: refs/heads/master 0f8b19703 -> 0b57898cd
[SYSTEMML-1588] Fix parfor spark result merge w/ pending rdd operations This patch fixes issues of the recently reworked parfor spark result merge. Due to stack overflow issues, the spark result merge was changed from an rdd union tree over all results to an export of all results and single rdd construction over these hdfs files. However, the export only covered existing hdfs files or dirty in-memory matrices, but no pending rdd operations. We now properly export the input matrices and guard against eager cleanup by maintaining the lineage of input rdds which still allows for lazy result merge evaluation. Note: The eager cleanup issues of temporary result files did not show up before because result matrix objects of parfor remote spark are not flagged to exist on hdfs. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0b57898c Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0b57898c Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0b57898c Branch: refs/heads/master Commit: 0b57898cddebe287350576948fdb1e13f9f3a813 Parents: 0f8b197 Author: Matthias Boehm <[email protected]> Authored: Sat May 6 23:45:36 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sun May 7 01:10:03 2017 -0700 ---------------------------------------------------------------------- .../parfor/ResultMergeRemoteSpark.java | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0b57898c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java index d783977..44b7dc2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaPairRDD; - +import org.apache.spark.api.java.JavaSparkContext; import org.apache.sysml.api.DMLScript; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; @@ -151,10 +151,12 @@ public class ResultMergeRemoteSpark extends ResultMerge job.setInputFormat(ii.inputFormatClass); Path[] paths = new Path[ inputs.length ]; for(int i=0; i<paths.length; i++) { - //ensure presence of hdfs if inputs come from memory - if( inputs[i].isDirty() ) - inputs[i].exportData(); + //ensure input exists on hdfs (e.g., if in-memory or RDD) + inputs[i].exportData(); paths[i] = new Path( inputs[i].getFileName() ); + //update rdd handle to allow lazy evaluation by guarding + //against cleanup of temporary result files + setRDDHandleForMerge(inputs[i], sec); } FileInputFormat.setInputPaths(job, paths); @@ -184,6 +186,8 @@ public class ResultMergeRemoteSpark extends ResultMerge //Step 3: create output rdd handle w/ lineage ret = new RDDObject(out, varname); + for(int i=0; i<paths.length; i++) + ret.addLineageChild(inputs[i].getRDDHandle()); if( withCompare ) ret.addLineageChild(compare.getRDDHandle()); } @@ -208,4 +212,15 @@ public class ResultMergeRemoteSpark extends ResultMerge return ret; } + + @SuppressWarnings("unchecked") + private void setRDDHandleForMerge(MatrixObject mo, SparkExecutionContext sec) { + InputInfo iinfo = InputInfo.BinaryBlockInputInfo; + JavaSparkContext sc = sec.getSparkContext(); + JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes,MatrixBlock>) + sc.hadoopFile( mo.getFileName(), iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass); + RDDObject rddhandle = new RDDObject(rdd, mo.getVarName()); + rddhandle.setHDFSFile(true); + mo.setRDDHandle(rddhandle); + } }
