Repository: incubator-systemml Updated Branches: refs/heads/master 88ad73939 -> 9fd834ed2
[SYSTEMML-1390] Avoid unnecessary caching of parfor spark dpe inputs Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9fd834ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9fd834ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9fd834ed Branch: refs/heads/master Commit: 9fd834ed235dea221a6b255e9fabf35b506491f8 Parents: 88ad739 Author: Matthias Boehm <[email protected]> Authored: Fri Mar 10 01:05:13 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Mar 10 11:54:33 2017 -0800 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 2 +- .../parfor/RemoteDPParForSpark.java | 20 ++++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd834ed/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index b7fe6e8..faababe 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -1244,7 +1244,7 @@ public class SparkExecutionContext extends ExecutionContext return jsc.sc().getPersistentRDDs().contains(rddID); } - private boolean isRDDCached( int rddID ) { + public boolean isRDDCached( int rddID ) { //check that rdd is marked for caching JavaSparkContext jsc = getSparkContext(); if( !jsc.sc().getPersistentRDDs().contains(rddID) ) { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd834ed/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index 3a27b66..b801402 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -47,6 +47,7 @@ import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; import org.apache.sysml.runtime.instructions.spark.data.DatasetObject; +import org.apache.sysml.runtime.instructions.spark.data.RDDObject; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction; @@ -120,6 +121,7 @@ public class RemoteDPParForSpark return ret; } + @SuppressWarnings("unchecked") private static JavaPairRDD<Long, Writable> getPartitionedInput(SparkExecutionContext sec, String matrixvar, OutputInfo oi, PDataPartitionFormat dpf) throws DMLRuntimeException @@ -146,14 +148,28 @@ public class RemoteDPParForSpark return prepinput.mapToPair(new DataFrameToRowBinaryBlockFunction( mc.getCols(), dsObj.isVectorBased(), dsObj.containsID())); } - //default binary block input rdd - else + //binary block input rdd without grouping + else if( !requiresGrouping(dpf, mo) ) { //get input rdd and data partitioning JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar); DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); return in.flatMapToPair(dpfun); } + //default binary block input rdd with grouping + else + { + //get input rdd, avoid unnecessary caching if input is checkpoint and not cached yet + //to reduce memory pressure for shuffle and subsequent + JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar); + if( mo.getRDDHandle().isCheckpointRDD() && !sec.isRDDCached(in.id()) ) + in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)((RDDObject) + mo.getRDDHandle().getLineageChilds().get(0)).getRDD(); + + //data partitioning of input rdd + DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); + return in.flatMapToPair(dpfun); + } } //determines if given input matrix requires grouping of partial partition slices
