[SYSTEMML-1314] Robust memory handling for RDD parallelization 

So far we only checked on RDD parallelization if the pinned matrix block
as well as the partitioned matrix fit together into the driver memory
budget; otherwise the matrix is exported (w/ a multi-threaded write in
binary block format to HDFS). However, since these parallelized RDDs
continue to reside in the driver, subsequent CP operations, broadcasts,
collects, or RDD parallelization might run into OOMs at the driver. 

This patch establishes a memory manager for parallelized RDDs that
allows to reserve memory and keeps track of currently referenced RDDs.
We keep the traditional 70% operation memory and 15% bufferpool, but use
additional 10% for potentially parallelized RDDs. Note that this
slightly over-provisions available memory as usually only 85-90% of max
heap are available to the application, but it prevents unnecessarily
conservative memory handling for the common case. 

Down the road, however, we should unify the entire buffer pool and
memory handling to manage CP intermediates, broadcasts, parallelized
RDDs, etc in one common memory manager. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c7f5f086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c7f5f086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c7f5f086

Branch: refs/heads/master
Commit: c7f5f086f2c7e9bb11f74e28f78a6ec70625a231
Parents: 31cb253
Author: Matthias Boehm <[email protected]>
Authored: Fri Mar 17 00:11:54 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Mar 17 12:52:41 2017 -0700

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 177 ++++++++-----------
 .../instructions/spark/data/RDDObject.java      |  18 +-
 2 files changed, 81 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7f5f086/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 648c51e..75a6334 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.controlprogram.context;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -49,7 +50,6 @@ import 
org.apache.sysml.runtime.controlprogram.caching.FrameObject;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.instructions.cp.Data;
-import org.apache.sysml.runtime.instructions.spark.SPInstruction;
 import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysml.runtime.instructions.spark.data.LineageObject;
 import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
@@ -92,10 +92,15 @@ public class SparkExecutionContext extends ExecutionContext
        //executor memory and relative fractions as obtained from the spark 
configuration
        private static SparkClusterConfig _sconf = null;
        
-       // Only one SparkContext may be active per JVM. You must stop() the 
active SparkContext before creating a new one. 
-       // This limitation may eventually be removed; see SPARK-2243 for more 
details.
+       //singleton spark context (as there can be only one spark context per 
JVM) 
        private static JavaSparkContext _spctx = null; 
        
+       //registry of parallelized RDDs to enforce that at any time, we spent 
at most 
+       //10% of JVM max heap size for parallelized RDDs; if this is not 
sufficient,
+       //matrices or frames are exported to HDFS and the RDDs are created from 
files.
+       //TODO unify memory management for CP, par RDDs, and potentially 
broadcasts
+       private static MemoryManagerParRDDs _parRDDs = new 
MemoryManagerParRDDs(0.1);
+       
        static {
                // for internal debugging only
                if( LDEBUG ) {
@@ -213,6 +218,8 @@ public class SparkExecutionContext extends ExecutionContext
                                SparkConf conf = createSystemMLSparkConf();
                                _spctx = new JavaSparkContext(conf);
                        }
+                       
+                       _parRDDs.clear();
                }
                
                // Set warning if spark.driver.maxResultSize is not set. It 
needs to be set before starting Spark Context for CP collect 
@@ -327,6 +334,7 @@ public class SparkExecutionContext extends ExecutionContext
                //always available and hence only store generic references in 
                //matrix object while all the logic is in the SparkExecContext
                
+               JavaSparkContext sc = getSparkContext();
                JavaPairRDD<?,?> rdd = null;
                //CASE 1: rdd already existing (reuse if checkpoint or trigger
                //pending rdd operations if not yet cached but prevent to 
re-evaluate 
@@ -342,19 +350,22 @@ public class SparkExecutionContext extends 
ExecutionContext
                {
                        //get in-memory matrix block and parallelize it
                        //w/ guarded parallelize (fallback to export, rdd from 
file if too large)
+                       MatrixCharacteristics mc = 
mo.getMatrixCharacteristics();
                        boolean fromFile = false;
-                       if( 
!OptimizerUtils.checkSparkCollectMemoryBudget(mo.getMatrixCharacteristics(), 0) 
) {
+                       if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 
0) || !_parRDDs.reserve(
+                                       
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc))) {
                                if( mo.isDirty() ) { //write only if necessary
                                        mo.exportData();
                                }
-                               rdd = getSparkContext().hadoopFile( 
mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, 
inputInfo.inputValueClass);
+                               rdd = sc.hadoopFile( mo.getFileName(), 
inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
                                rdd = 
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); 
//cp is workaround for read bug                   
                                fromFile = true;
                        }
                        else { //default case
                                MatrixBlock mb = mo.acquireRead(); //pin matrix 
in memory
-                               rdd = toMatrixJavaPairRDD(getSparkContext(), 
mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock());
+                               rdd = toMatrixJavaPairRDD(sc, mb, 
(int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock());
                                mo.release(); //unpin matrix
+                               _parRDDs.registerRDD(rdd.id(), 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
                        }
                        
                        //keep rdd handle for future operations on it
@@ -368,17 +379,17 @@ public class SparkExecutionContext extends 
ExecutionContext
                        // parallelize hdfs-resident file
                        // For binary block, these are: 
SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class
                        if(inputInfo == InputInfo.BinaryBlockInputInfo) {
-                               rdd = getSparkContext().hadoopFile( 
mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, 
inputInfo.inputValueClass);
+                               rdd = sc.hadoopFile( mo.getFileName(), 
inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
                                //note: this copy is still required in Spark 
1.4 because spark hands out whatever the inputformat
                                //recordreader returns; the javadoc explicitly 
recommend to copy all key/value pairs
                                rdd = 
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); 
//cp is workaround for read bug
                        }
                        else if(inputInfo == InputInfo.TextCellInputInfo || 
inputInfo == InputInfo.CSVInputInfo || inputInfo == 
InputInfo.MatrixMarketInputInfo) {
-                               rdd = getSparkContext().hadoopFile( 
mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, 
inputInfo.inputValueClass);
+                               rdd = sc.hadoopFile( mo.getFileName(), 
inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
                                rdd = ((JavaPairRDD<LongWritable, 
Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for 
read bug
                        }
                        else if(inputInfo == InputInfo.BinaryCellInputInfo) {
-                               rdd = getSparkContext().hadoopFile( 
mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, 
inputInfo.inputValueClass);
+                               rdd = sc.hadoopFile( mo.getFileName(), 
inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
                                rdd = ((JavaPairRDD<MatrixIndexes, 
MatrixCell>)rdd).mapToPair( new CopyBinaryCellFunction() ); //cp is workaround 
for read bug
                        }
                        else {
@@ -415,6 +426,7 @@ public class SparkExecutionContext extends ExecutionContext
                InputInfo inputInfo2 = 
(inputInfo==InputInfo.BinaryBlockInputInfo) ? 
                                InputInfo.BinaryBlockFrameInputInfo : inputInfo;
                
+               JavaSparkContext sc = getSparkContext();
                JavaPairRDD<?,?> rdd = null;
                //CASE 1: rdd already existing (reuse if checkpoint or trigger
                //pending rdd operations if not yet cached but prevent to 
re-evaluate 
@@ -430,19 +442,22 @@ public class SparkExecutionContext extends 
ExecutionContext
                {
                        //get in-memory matrix block and parallelize it
                        //w/ guarded parallelize (fallback to export, rdd from 
file if too large)
+                       MatrixCharacteristics mc = 
fo.getMatrixCharacteristics();
                        boolean fromFile = false;
-                       if( 
!OptimizerUtils.checkSparkCollectMemoryBudget(fo.getMatrixCharacteristics(), 0) 
) {
+                       if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 
0) || !_parRDDs.reserve(
+                                       
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc)) ) {
                                if( fo.isDirty() ) { //write only if necessary
                                        fo.exportData();
                                }
-                               rdd = getSparkContext().hadoopFile( 
fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
+                               rdd = sc.hadoopFile( fo.getFileName(), 
inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
                                rdd = ((JavaPairRDD<LongWritable, 
FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is 
workaround for read bug                       
                                fromFile = true;
                        }
                        else { //default case
                                FrameBlock fb = fo.acquireRead(); //pin frame 
in memory
-                               rdd = toFrameJavaPairRDD(getSparkContext(), fb);
+                               rdd = toFrameJavaPairRDD(sc, fb);
                                fo.release(); //unpin frame
+                               _parRDDs.registerRDD(rdd.id(), 
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
                        }
                        
                        //keep rdd handle for future operations on it
@@ -456,13 +471,13 @@ public class SparkExecutionContext extends 
ExecutionContext
                        // parallelize hdfs-resident file
                        // For binary block, these are: 
SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class
                        if(inputInfo2 == InputInfo.BinaryBlockFrameInputInfo) {
-                               rdd = getSparkContext().hadoopFile( 
fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
+                               rdd = sc.hadoopFile( fo.getFileName(), 
inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
                                //note: this copy is still required in Spark 
1.4 because spark hands out whatever the inputformat
                                //recordreader returns; the javadoc explicitly 
recommend to copy all key/value pairs
                                rdd = ((JavaPairRDD<LongWritable, 
FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is 
workaround for read bug
                        }
                        else if(inputInfo2 == InputInfo.TextCellInputInfo || 
inputInfo2 == InputInfo.CSVInputInfo || inputInfo2 == 
InputInfo.MatrixMarketInputInfo) {
-                               rdd = getSparkContext().hadoopFile( 
fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
+                               rdd = sc.hadoopFile( fo.getFileName(), 
inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, 
inputInfo2.inputValueClass);
                                rdd = ((JavaPairRDD<LongWritable, 
Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for 
read bug
                        }
                        else if(inputInfo2 == InputInfo.BinaryCellInputInfo) {
@@ -1108,10 +1123,13 @@ public class SparkExecutionContext extends 
ExecutionContext
                //incl deferred hdfs file removal (only if metadata set by 
cleanup call)
                if( lob instanceof RDDObject ) {
                        RDDObject rdd = (RDDObject)lob;
+                       int rddID = rdd.getRDD().id();
                        cleanupRDDVariable(rdd.getRDD());
                        if( rdd.getHDFSFilename()!=null ) { //deferred file 
removal
                                
MapReduceTool.deleteFileWithMTDIfExistOnHDFS(rdd.getHDFSFilename());
                        }
+                       if( rdd.isParallelizedRDD() )
+                               _parRDDs.deregisterRDD(rddID);
                }
                else if( lob instanceof BroadcastObject ) {
                        PartitionedBroadcast pbm = 
((BroadcastObject)lob).getBroadcast();
@@ -1259,100 +1277,6 @@ public class SparkExecutionContext extends 
ExecutionContext
                }
                return false;
        }
-       
-       
-       ///////////////////////////////////////////
-       // Debug String Handling (see explain); TODO to be removed
-       ///////
-
-       public void setDebugString(SPInstruction inst, String outputVarName) 
-               throws DMLRuntimeException 
-       {
-               RDDObject parentLineage = 
getMatrixObject(outputVarName).getRDDHandle();
-               
-               if( parentLineage == null || parentLineage.getRDD() == null )
-                       return;
-       
-               JavaPairRDD<?, ?> out = parentLineage.getRDD();
-               JavaPairRDD<?, ?> in1 = null; 
-               JavaPairRDD<?, ?> in2 = null;
-               String input1VarName = null; 
-               String input2VarName = null;
-               if(parentLineage.getLineageChilds() != null) {
-                       for(LineageObject child : 
parentLineage.getLineageChilds()) {
-                               if(child instanceof RDDObject) {
-                                       if(in1 == null) {
-                                               in1 = ((RDDObject) 
child).getRDD();
-                                               input1VarName = 
child.getVarName();
-                                       }
-                                       else if(in2 == null) {
-                                               in2 = ((RDDObject) 
child).getRDD();
-                                               input2VarName = 
child.getVarName();
-                                       }
-                                       else {
-                                               throw new 
DMLRuntimeException("PRINT_EXPLAIN_WITH_LINEAGE not yet supported for three 
outputs");
-                                       }
-                               }
-                       }
-               }
-               setLineageInfoForExplain(inst, out, in1, input1VarName, in2, 
input2VarName);
-       }
-       
-       // The most expensive operation here is rdd.toDebugString() which can 
be a major hit because
-       // of unrolling lazy evaluation of Spark. Hence, it is guarded against 
it along with flag 'PRINT_EXPLAIN_WITH_LINEAGE' which is 
-       // enabled only through MLContext. This way, it doesnot affect our 
performance evaluation through non-MLContext path
-       @SuppressWarnings("unused")
-       private void setLineageInfoForExplain(SPInstruction inst, 
-                       JavaPairRDD<?, ?> out, 
-                       JavaPairRDD<?, ?> in1, String in1Name, 
-                       JavaPairRDD<?, ?> in2, String in2Name) throws 
DMLRuntimeException {
-               
-                       
-               // RDDInfo outInfo = 
org.apache.spark.storage.RDDInfo.fromRdd(out.rdd());
-               
-               // First fetch start lines from input RDDs
-               String startLine1 = null; 
-               String startLine2 = null;
-               int i1length = 0, i2length = 0;
-               if(in1 != null) {
-                       String [] lines = in1.toDebugString().split("\\r?\\n");
-                       startLine1 = 
SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, 
lines[0].length());
-                       i1length = lines.length;
-               }
-               if(in2 != null) {
-                       String [] lines = in2.toDebugString().split("\\r?\\n");
-                       startLine2 =  
SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, 
lines[0].length());
-                       i2length = lines.length;
-               }
-               
-               String outDebugString = "";
-               int skip = 0;
-               
-               // Now process output RDD and replace inputRDD debug string by 
the matrix variable name
-               String [] outLines = out.toDebugString().split("\\r?\\n");
-               for(int i = 0; i < outLines.length; i++) {
-                       if(skip > 0) {
-                               skip--;
-                               // outDebugString += "\nSKIP:" + outLines[i];
-                       }
-                       else if(startLine1 != null && 
outLines[i].contains(startLine1)) {
-                               String prefix = 
SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // 
outLines[i].substring(0, outLines[i].length() - startLine1.length());
-                               outDebugString += "\n" + prefix + "[[" + 
in1Name + "]]";
-                               //outDebugString += "\n{" + prefix + "}[[" + 
in1Name + "]] => " + outLines[i];
-                               skip = i1length - 1;  
-                       }
-                       else if(startLine2 != null && 
outLines[i].contains(startLine2)) {
-                               String prefix = 
SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // 
outLines[i].substring(0, outLines[i].length() - startLine2.length());
-                               outDebugString += "\n" + prefix + "[[" + 
in2Name + "]]";
-                               skip = i2length - 1;
-                       }
-                       else {
-                               outDebugString += "\n" + outLines[i];
-                       }
-               }
-               
-       }
-       
 
        ///////////////////////////////////////////
        // Spark configuration handling 
@@ -1610,4 +1534,41 @@ public class SparkExecutionContext extends 
ExecutionContext
                        return sb.toString();
                }
        }
+       
+       private static class MemoryManagerParRDDs 
+       {
+               private final long _limit;
+               private long _size;
+               private HashMap<Integer, Long> _rdds;
+               
+               public MemoryManagerParRDDs(double fractionMem) {
+                       _limit = (long)(fractionMem * 
InfrastructureAnalyzer.getLocalMaxMemory());
+                       _size = 0;
+                       _rdds = new HashMap<Integer, Long>();
+               }
+
+               public synchronized boolean reserve(long rddSize) {
+                       boolean ret = (rddSize + _size < _limit);
+                       _size += ret ? rddSize : 0;
+                       return ret;
+               }
+               
+               public synchronized void registerRDD(int rddID, long rddSize, 
boolean reserved) {
+                       if( !reserved ) {
+                               throw new RuntimeException("Unsupported rdd 
registration "
+                                               + "without size reservation for 
"+rddSize+" bytes.");
+                       }
+                       _rdds.put(rddID, rddSize);
+               }
+               
+               public synchronized void deregisterRDD(int rddID) {
+                       long rddSize = _rdds.remove(rddID);
+                       _size -= rddSize;
+               }
+
+               public synchronized void clear() {
+                       _size = 0;
+                       _rdds.clear();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7f5f086/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index fd5dad3..67f97a3 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -29,24 +29,22 @@ public class RDDObject extends LineageObject
        private boolean _checkpointed = false; //created via checkpoint 
instruction
        private boolean _hdfsfile = false;     //created from hdfs file
        private String  _hdfsFname = null;     //hdfs filename, if created from 
hdfs.  
+       private boolean _parRDD = false;
        
        public RDDObject( JavaPairRDD<?,?> rddvar, String varName) {
                super(varName);
                _rddHandle = rddvar;
        }
 
-       public JavaPairRDD<?,?> getRDD()
-       {
+       public JavaPairRDD<?,?> getRDD() {
                return _rddHandle;
        }
        
-       public void setCheckpointRDD( boolean flag )
-       {
+       public void setCheckpointRDD( boolean flag ) {
                _checkpointed = flag;
        }
        
-       public boolean isCheckpointRDD() 
-       {
+       public boolean isCheckpointRDD() {
                return _checkpointed;
        }
        
@@ -66,6 +64,14 @@ public class RDDObject extends LineageObject
                return _hdfsFname;
        }
        
+       public void setParallelizedRDD( boolean flag ) {
+               _parRDD = flag;
+       }
+       
+       public boolean isParallelizedRDD() {
+               return _parRDD; 
+       }
+       
 
        /**
         * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;

Reply via email to