[SYSTEMML-1314] Robust memory handling for RDD parallelization So far we only checked on RDD parallelization if the pinned matrix block as well as the partitioned matrix fit together into the driver memory budget; otherwise the matrix is exported (w/ a multi-threaded write in binary block format to HDFS). However, since these parallelized RDDs continue to reside in the driver, subsequent CP operations, broadcasts, collects, or RDD parallelization might run into OOMs at the driver.
This patch establishes a memory manager for parallelized RDDs that allows to reserve memory and keeps track of currently referenced RDDs. We keep the traditional 70% operation memory and 15% bufferpool, but use additional 10% for potentially parallelized RDDs. Note that this slightly over-provisions available memory as usually only 85-90% of max heap are available to the application, but it prevents unnecessarily conservative memory handling for the common case. Down the road, however, we should unify the entire buffer pool and memory handling to manage CP intermediates, broadcasts, parallelized RDDs, etc in one common memory manager. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c7f5f086 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c7f5f086 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c7f5f086 Branch: refs/heads/master Commit: c7f5f086f2c7e9bb11f74e28f78a6ec70625a231 Parents: 31cb253 Author: Matthias Boehm <[email protected]> Authored: Fri Mar 17 00:11:54 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Mar 17 12:52:41 2017 -0700 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 177 ++++++++----------- .../instructions/spark/data/RDDObject.java | 18 +- 2 files changed, 81 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7f5f086/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index 648c51e..75a6334 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.controlprogram.context; import java.io.IOException; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -49,7 +50,6 @@ import org.apache.sysml.runtime.controlprogram.caching.FrameObject; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysml.runtime.instructions.cp.Data; -import org.apache.sysml.runtime.instructions.spark.SPInstruction; import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject; import org.apache.sysml.runtime.instructions.spark.data.LineageObject; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock; @@ -92,10 +92,15 @@ public class SparkExecutionContext extends ExecutionContext //executor memory and relative fractions as obtained from the spark configuration private static SparkClusterConfig _sconf = null; - // Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. - // This limitation may eventually be removed; see SPARK-2243 for more details. + //singleton spark context (as there can be only one spark context per JVM) private static JavaSparkContext _spctx = null; + //registry of parallelized RDDs to enforce that at any time, we spent at most + //10% of JVM max heap size for parallelized RDDs; if this is not sufficient, + //matrices or frames are exported to HDFS and the RDDs are created from files. + //TODO unify memory management for CP, par RDDs, and potentially broadcasts + private static MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1); + static { // for internal debugging only if( LDEBUG ) { @@ -213,6 +218,8 @@ public class SparkExecutionContext extends ExecutionContext SparkConf conf = createSystemMLSparkConf(); _spctx = new JavaSparkContext(conf); } + + _parRDDs.clear(); } // Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect @@ -327,6 +334,7 @@ public class SparkExecutionContext extends ExecutionContext //always available and hence only store generic references in //matrix object while all the logic is in the SparkExecContext + JavaSparkContext sc = getSparkContext(); JavaPairRDD<?,?> rdd = null; //CASE 1: rdd already existing (reuse if checkpoint or trigger //pending rdd operations if not yet cached but prevent to re-evaluate @@ -342,19 +350,22 @@ public class SparkExecutionContext extends ExecutionContext { //get in-memory matrix block and parallelize it //w/ guarded parallelize (fallback to export, rdd from file if too large) + MatrixCharacteristics mc = mo.getMatrixCharacteristics(); boolean fromFile = false; - if( !OptimizerUtils.checkSparkCollectMemoryBudget(mo.getMatrixCharacteristics(), 0) ) { + if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 0) || !_parRDDs.reserve( + OptimizerUtils.estimatePartitionedSizeExactSparsity(mc))) { if( mo.isDirty() ) { //write only if necessary mo.exportData(); } - rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); + rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug fromFile = true; } else { //default case MatrixBlock mb = mo.acquireRead(); //pin matrix in memory - rdd = toMatrixJavaPairRDD(getSparkContext(), mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock()); + rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock()); mo.release(); //unpin matrix + _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } //keep rdd handle for future operations on it @@ -368,17 +379,17 @@ public class SparkExecutionContext extends ExecutionContext // parallelize hdfs-resident file // For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class if(inputInfo == InputInfo.BinaryBlockInputInfo) { - rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); + rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); //note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat //recordreader returns; the javadoc explicitly recommend to copy all key/value pairs rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug } else if(inputInfo == InputInfo.TextCellInputInfo || inputInfo == InputInfo.CSVInputInfo || inputInfo == InputInfo.MatrixMarketInputInfo) { - rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); + rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug } else if(inputInfo == InputInfo.BinaryCellInputInfo) { - rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); + rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); rdd = ((JavaPairRDD<MatrixIndexes, MatrixCell>)rdd).mapToPair( new CopyBinaryCellFunction() ); //cp is workaround for read bug } else { @@ -415,6 +426,7 @@ public class SparkExecutionContext extends ExecutionContext InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ? InputInfo.BinaryBlockFrameInputInfo : inputInfo; + JavaSparkContext sc = getSparkContext(); JavaPairRDD<?,?> rdd = null; //CASE 1: rdd already existing (reuse if checkpoint or trigger //pending rdd operations if not yet cached but prevent to re-evaluate @@ -430,19 +442,22 @@ public class SparkExecutionContext extends ExecutionContext { //get in-memory matrix block and parallelize it //w/ guarded parallelize (fallback to export, rdd from file if too large) + MatrixCharacteristics mc = fo.getMatrixCharacteristics(); boolean fromFile = false; - if( !OptimizerUtils.checkSparkCollectMemoryBudget(fo.getMatrixCharacteristics(), 0) ) { + if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 0) || !_parRDDs.reserve( + OptimizerUtils.estimatePartitionedSizeExactSparsity(mc)) ) { if( fo.isDirty() ) { //write only if necessary fo.exportData(); } - rdd = getSparkContext().hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); + rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug fromFile = true; } else { //default case FrameBlock fb = fo.acquireRead(); //pin frame in memory - rdd = toFrameJavaPairRDD(getSparkContext(), fb); + rdd = toFrameJavaPairRDD(sc, fb); fo.release(); //unpin frame + _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true); } //keep rdd handle for future operations on it @@ -456,13 +471,13 @@ public class SparkExecutionContext extends ExecutionContext // parallelize hdfs-resident file // For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class if(inputInfo2 == InputInfo.BinaryBlockFrameInputInfo) { - rdd = getSparkContext().hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); + rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); //note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat //recordreader returns; the javadoc explicitly recommend to copy all key/value pairs rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug } else if(inputInfo2 == InputInfo.TextCellInputInfo || inputInfo2 == InputInfo.CSVInputInfo || inputInfo2 == InputInfo.MatrixMarketInputInfo) { - rdd = getSparkContext().hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); + rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass); rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug } else if(inputInfo2 == InputInfo.BinaryCellInputInfo) { @@ -1108,10 +1123,13 @@ public class SparkExecutionContext extends ExecutionContext //incl deferred hdfs file removal (only if metadata set by cleanup call) if( lob instanceof RDDObject ) { RDDObject rdd = (RDDObject)lob; + int rddID = rdd.getRDD().id(); cleanupRDDVariable(rdd.getRDD()); if( rdd.getHDFSFilename()!=null ) { //deferred file removal MapReduceTool.deleteFileWithMTDIfExistOnHDFS(rdd.getHDFSFilename()); } + if( rdd.isParallelizedRDD() ) + _parRDDs.deregisterRDD(rddID); } else if( lob instanceof BroadcastObject ) { PartitionedBroadcast pbm = ((BroadcastObject)lob).getBroadcast(); @@ -1259,100 +1277,6 @@ public class SparkExecutionContext extends ExecutionContext } return false; } - - - /////////////////////////////////////////// - // Debug String Handling (see explain); TODO to be removed - /////// - - public void setDebugString(SPInstruction inst, String outputVarName) - throws DMLRuntimeException - { - RDDObject parentLineage = getMatrixObject(outputVarName).getRDDHandle(); - - if( parentLineage == null || parentLineage.getRDD() == null ) - return; - - JavaPairRDD<?, ?> out = parentLineage.getRDD(); - JavaPairRDD<?, ?> in1 = null; - JavaPairRDD<?, ?> in2 = null; - String input1VarName = null; - String input2VarName = null; - if(parentLineage.getLineageChilds() != null) { - for(LineageObject child : parentLineage.getLineageChilds()) { - if(child instanceof RDDObject) { - if(in1 == null) { - in1 = ((RDDObject) child).getRDD(); - input1VarName = child.getVarName(); - } - else if(in2 == null) { - in2 = ((RDDObject) child).getRDD(); - input2VarName = child.getVarName(); - } - else { - throw new DMLRuntimeException("PRINT_EXPLAIN_WITH_LINEAGE not yet supported for three outputs"); - } - } - } - } - setLineageInfoForExplain(inst, out, in1, input1VarName, in2, input2VarName); - } - - // The most expensive operation here is rdd.toDebugString() which can be a major hit because - // of unrolling lazy evaluation of Spark. Hence, it is guarded against it along with flag 'PRINT_EXPLAIN_WITH_LINEAGE' which is - // enabled only through MLContext. This way, it doesnot affect our performance evaluation through non-MLContext path - @SuppressWarnings("unused") - private void setLineageInfoForExplain(SPInstruction inst, - JavaPairRDD<?, ?> out, - JavaPairRDD<?, ?> in1, String in1Name, - JavaPairRDD<?, ?> in2, String in2Name) throws DMLRuntimeException { - - - // RDDInfo outInfo = org.apache.spark.storage.RDDInfo.fromRdd(out.rdd()); - - // First fetch start lines from input RDDs - String startLine1 = null; - String startLine2 = null; - int i1length = 0, i2length = 0; - if(in1 != null) { - String [] lines = in1.toDebugString().split("\\r?\\n"); - startLine1 = SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, lines[0].length()); - i1length = lines.length; - } - if(in2 != null) { - String [] lines = in2.toDebugString().split("\\r?\\n"); - startLine2 = SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, lines[0].length()); - i2length = lines.length; - } - - String outDebugString = ""; - int skip = 0; - - // Now process output RDD and replace inputRDD debug string by the matrix variable name - String [] outLines = out.toDebugString().split("\\r?\\n"); - for(int i = 0; i < outLines.length; i++) { - if(skip > 0) { - skip--; - // outDebugString += "\nSKIP:" + outLines[i]; - } - else if(startLine1 != null && outLines[i].contains(startLine1)) { - String prefix = SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // outLines[i].substring(0, outLines[i].length() - startLine1.length()); - outDebugString += "\n" + prefix + "[[" + in1Name + "]]"; - //outDebugString += "\n{" + prefix + "}[[" + in1Name + "]] => " + outLines[i]; - skip = i1length - 1; - } - else if(startLine2 != null && outLines[i].contains(startLine2)) { - String prefix = SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // outLines[i].substring(0, outLines[i].length() - startLine2.length()); - outDebugString += "\n" + prefix + "[[" + in2Name + "]]"; - skip = i2length - 1; - } - else { - outDebugString += "\n" + outLines[i]; - } - } - - } - /////////////////////////////////////////// // Spark configuration handling @@ -1610,4 +1534,41 @@ public class SparkExecutionContext extends ExecutionContext return sb.toString(); } } + + private static class MemoryManagerParRDDs + { + private final long _limit; + private long _size; + private HashMap<Integer, Long> _rdds; + + public MemoryManagerParRDDs(double fractionMem) { + _limit = (long)(fractionMem * InfrastructureAnalyzer.getLocalMaxMemory()); + _size = 0; + _rdds = new HashMap<Integer, Long>(); + } + + public synchronized boolean reserve(long rddSize) { + boolean ret = (rddSize + _size < _limit); + _size += ret ? rddSize : 0; + return ret; + } + + public synchronized void registerRDD(int rddID, long rddSize, boolean reserved) { + if( !reserved ) { + throw new RuntimeException("Unsupported rdd registration " + + "without size reservation for "+rddSize+" bytes."); + } + _rdds.put(rddID, rddSize); + } + + public synchronized void deregisterRDD(int rddID) { + long rddSize = _rdds.remove(rddID); + _size -= rddSize; + } + + public synchronized void clear() { + _size = 0; + _rdds.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7f5f086/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java index fd5dad3..67f97a3 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java @@ -29,24 +29,22 @@ public class RDDObject extends LineageObject private boolean _checkpointed = false; //created via checkpoint instruction private boolean _hdfsfile = false; //created from hdfs file private String _hdfsFname = null; //hdfs filename, if created from hdfs. + private boolean _parRDD = false; public RDDObject( JavaPairRDD<?,?> rddvar, String varName) { super(varName); _rddHandle = rddvar; } - public JavaPairRDD<?,?> getRDD() - { + public JavaPairRDD<?,?> getRDD() { return _rddHandle; } - public void setCheckpointRDD( boolean flag ) - { + public void setCheckpointRDD( boolean flag ) { _checkpointed = flag; } - public boolean isCheckpointRDD() - { + public boolean isCheckpointRDD() { return _checkpointed; } @@ -66,6 +64,14 @@ public class RDDObject extends LineageObject return _hdfsFname; } + public void setParallelizedRDD( boolean flag ) { + _parRDD = flag; + } + + public boolean isParallelizedRDD() { + return _parRDD; + } + /** * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;
