Repository: incubator-systemml
Updated Branches:
  refs/heads/master 88ad73939 -> 9fd834ed2


[SYSTEMML-1390] Avoid unnecessary caching of parfor spark dpe inputs

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/9fd834ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/9fd834ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/9fd834ed

Branch: refs/heads/master
Commit: 9fd834ed235dea221a6b255e9fabf35b506491f8
Parents: 88ad739
Author: Matthias Boehm <[email protected]>
Authored: Fri Mar 10 01:05:13 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Mar 10 11:54:33 2017 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          |  2 +-
 .../parfor/RemoteDPParForSpark.java             | 20 ++++++++++++++++++--
 2 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd834ed/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index b7fe6e8..faababe 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1244,7 +1244,7 @@ public class SparkExecutionContext extends 
ExecutionContext
                return jsc.sc().getPersistentRDDs().contains(rddID);
        }
 
-       private boolean isRDDCached( int rddID ) {
+       public boolean isRDDCached( int rddID ) {
                //check that rdd is marked for caching
                JavaSparkContext jsc = getSparkContext();
                if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/9fd834ed/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 3a27b66..b801402 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -47,6 +47,7 @@ import 
org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.instructions.spark.data.DatasetObject;
+import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import 
org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction;
@@ -120,6 +121,7 @@ public class RemoteDPParForSpark
                return ret;
        }
        
+       @SuppressWarnings("unchecked")
        private static JavaPairRDD<Long, Writable> 
getPartitionedInput(SparkExecutionContext sec, 
                        String matrixvar, OutputInfo oi, PDataPartitionFormat 
dpf) 
                throws DMLRuntimeException 
@@ -146,14 +148,28 @@ public class RemoteDPParForSpark
                        return prepinput.mapToPair(new 
DataFrameToRowBinaryBlockFunction(
                                        mc.getCols(), dsObj.isVectorBased(), 
dsObj.containsID()));
                }
-               //default binary block input rdd
-               else
+               //binary block input rdd without grouping
+               else if( !requiresGrouping(dpf, mo) ) 
                {
                        //get input rdd and data partitioning 
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(matrixvar);
                        DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
                        return in.flatMapToPair(dpfun);
                }
+               //default binary block input rdd with grouping
+               else
+               {
+                       //get input rdd, avoid unnecessary caching if input is 
checkpoint and not cached yet
+                       //to reduce memory pressure for shuffle and subsequent 
+                       JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
sec.getBinaryBlockRDDHandleForVariable(matrixvar);
+                       if( mo.getRDDHandle().isCheckpointRDD() && 
!sec.isRDDCached(in.id()) )
+                               in = 
(JavaPairRDD<MatrixIndexes,MatrixBlock>)((RDDObject)
+                                               
mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
+                       
+                       //data partitioning of input rdd 
+                       DataPartitionerRemoteSparkMapper dpfun = new 
DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
+                       return in.flatMapToPair(dpfun);
+               }
        } 
        
        //determines if given input matrix requires grouping of partial 
partition slices

Reply via email to