Repository: incubator-systemml Updated Branches: refs/heads/master b6a46500d -> b78c12593
[SYSTEMML-1350] Performance parfor spark datapartition-execute jobs This patch makes the following performance and robustness improvements to the parfor spark datapartition-execute and datapartition jobs: (1) Data-size-dependent number of reduce tasks. So far, we used at max the number of cores to achieve best pre-aggregation of results. However, on spark, a too small number of reduce tasks (and hence too large reduce partitions) can lead to 2GB limit issues and OOMs due to increase memory pressure. We now determine the number of reduce tasks in a more considerate way. (2) Reuse of matrix block partitions. For dense matrix block partitions, we now reuse already allocated partitions in order to reduce GC overheads. (3) Incremental nnz maintenance. Finally, we now also incrementally maintain the nnz during partition collect based on the block meta data instead of recomputing it, which avoids an unnecessary scan per partition. Together, these changes improved the runtime of perftest 80GB univariate/bivariate from 379s/466s to 337s/376s on a small 1+5 node cluster, while ensuring also much better robustness for larger datasets. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/baa70a15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/baa70a15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/baa70a15 Branch: refs/heads/master Commit: baa70a150b7e01f81f72ac1ce0ca07fab9a18265 Parents: b6a4650 Author: Matthias Boehm <[email protected]> Authored: Fri Feb 24 15:02:53 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Sat Feb 25 11:51:02 2017 -0800 ---------------------------------------------------------------------- .../parfor/DataPartitionerRemoteSpark.java | 36 ++++--- .../DataPartitionerRemoteSparkReducer.java | 20 ++-- .../parfor/RemoteDPParForSpark.java | 28 +++-- .../parfor/RemoteDPParForSparkWorker.java | 107 ++++++++----------- 4 files changed, 89 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java index 41fb235..be758d2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java @@ -27,6 +27,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -66,26 +67,24 @@ public class DataPartitionerRemoteSpark extends DataPartitioner try { - //cleanup existing output files - MapReduceTool.deleteFileIfExistOnHDFS(fnameNew); - - //determine degree of parallelism - int numRed = (int)determineNumReducers(rlen, clen, brlen, bclen, _numRed); - + //cleanup existing output files + MapReduceTool.deleteFileIfExistOnHDFS(fnameNew); //get input rdd JavaPairRDD<MatrixIndexes, MatrixBlock> inRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) sec.getRDDHandleForMatrixObject(in, InputInfo.BinaryBlockInputInfo); - MatrixCharacteristics mc = in.getMatrixCharacteristics(); + //determine degree of parallelism + MatrixCharacteristics mc = in.getMatrixCharacteristics(); + int numRed = (int)determineNumReducers(inRdd, mc, _numRed); + //run spark remote data partition job DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format); DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi); inRdd.flatMapToPair(dpfun) //partition the input blocks .groupByKey(numRed) //group partition blocks - .foreach( wfun ); //write partitions to hdfs + .foreach(wfun); //write partitions to hdfs } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } @@ -97,12 +96,17 @@ public class DataPartitionerRemoteSpark extends DataPartitioner } } - private long determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed) + private long determineNumReducers(JavaPairRDD<MatrixIndexes,MatrixBlock> in, + MatrixCharacteristics mc, long numRed) { - //set the number of mappers and reducers + long rlen = mc.getRows(); + long clen = mc.getCols(); + int brlen = mc.getRowsPerBlock(); + int bclen = mc.getColsPerBlock(); + + //determine number of reducer groups long reducerGroups = -1; - switch( _format ) - { + switch( _format ) { case ROW_WISE: reducerGroups = rlen; break; case COLUMN_WISE: reducerGroups = clen; break; case ROW_BLOCK_WISE: reducerGroups = (rlen/brlen)+((rlen%brlen==0)?0:1); break; @@ -113,6 +117,8 @@ public class DataPartitionerRemoteSpark extends DataPartitioner //do nothing } - return (int)Math.min( numRed, reducerGroups); + //compute number of reducers (to avoid OOMs and reduce memory pressure) + int numParts = SparkUtils.getNumPreferredPartitions(mc, in); + return Math.max(numRed, Math.min(numParts, reducerGroups)); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java index 8caac98..d8bb04d 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java @@ -30,8 +30,8 @@ import org.apache.hadoop.io.Writable; import org.apache.spark.api.java.function.VoidFunction; import org.apache.sysml.conf.ConfigurationManager; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock; +import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.OutputInfo; @@ -40,16 +40,12 @@ import scala.Tuple2; public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Long, Iterable<Writable>>> { - private static final long serialVersionUID = -7149865018683261964L; private String _fnameNew = null; - public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo oi) - throws DMLRuntimeException - { + public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo oi) { _fnameNew = fnameNew; - //_oi = oi; } @Override @@ -69,17 +65,13 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo FileSystem fs = FileSystem.get(job); Path path = new Path(_fnameNew + File.separator + key); writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); - while( valueList.hasNext() ) - { + while( valueList.hasNext() ) { PairWritableBlock pair = (PairWritableBlock) valueList.next(); writer.append(pair.indexes, pair.block); } } - finally - { - if( writer != null ) - writer.close(); + finally { + IOUtilFunctions.closeSilently(writer); } - } - + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index 8663038..0c4b570 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -36,6 +36,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils; import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.MatrixDimensionsMetaData; import org.apache.sysml.runtime.matrix.data.InputInfo; @@ -55,10 +56,9 @@ public class RemoteDPParForSpark protected static final Log LOG = LogFactory.getLog(RemoteDPParForSpark.class.getName()); - public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, - ExecutionContext ec, - PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params - boolean enableCPCaching, int numReducers ) //opt params + public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, + MatrixObject input, ExecutionContext ec, PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params + boolean enableCPCaching, int numReducers ) //opt params throws DMLRuntimeException { String jobname = "ParFor-DPESP"; @@ -71,20 +71,26 @@ public class RemoteDPParForSpark MatrixDimensionsMetaData md = (MatrixDimensionsMetaData) input.getMetaData(); MatrixCharacteristics mc = md.getMatrixCharacteristics(); InputInfo ii = InputInfo.BinaryBlockInputInfo; - - //initialize accumulators for tasks/iterations + + //initialize accumulators for tasks/iterations, and inputs + JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar); LongAccumulator aTasks = sc.sc().longAccumulator("tasks"); LongAccumulator aIters = sc.sc().longAccumulator("iterations"); + + //compute number of reducers (to avoid OOMs and reduce memory pressure) + int numParts = SparkUtils.getNumPreferredPartitions(mc, in); + int numParts2 = (int)((dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : mc.getCols()); + int numReducers2 = Math.max(numReducers, Math.min(numParts, numParts2)); - JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar); + //core parfor datapartition-execute DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf); RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, matrixvar, itervar, enableCPCaching, mc, tSparseCol, dpf, oi, aTasks, aIters); List<Tuple2<Long,String>> out = - in.flatMapToPair(dpfun) //partition the input blocks - .groupByKey(numReducers) //group partition blocks - .mapPartitionsToPair( efun ) //execute parfor tasks, incl cleanup - .collect(); //get output handles + in.flatMapToPair(dpfun) //partition the input blocks + .groupByKey(numReducers2) //group partition blocks + .mapPartitionsToPair(efun) //execute parfor tasks, incl cleanup + .collect(); //get output handles //de-serialize results LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index e12d010..ad0fbf8 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -47,27 +47,25 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF { private static final long serialVersionUID = 30223759283155139L; - private String _prog = null; - private boolean _caching = true; - private String _inputVar = null; - private String _iterVar = null; + private final String _prog; + private final boolean _caching; + private final String _inputVar; + private final String _iterVar; - private OutputInfo _oinfo = null; - private int _rlen = -1; - private int _clen = -1; - private int _brlen = -1; - private int _bclen = -1; - private boolean _tSparseCol = false; - private PDataPartitionFormat _dpf = null; + private final OutputInfo _oinfo; + private final int _rlen; + private final int _clen; + private final int _brlen; + private final int _bclen; + private final boolean _tSparseCol; + private final PDataPartitionFormat _dpf; - private LongAccumulator _aTasks = null; - private LongAccumulator _aIters = null; + private final LongAccumulator _aTasks; + private final LongAccumulator _aIters; public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { - //keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent - //when this constructor is actually called; hence, we do lazy initialization on task execution) _prog = program; _caching = cpCaching; _inputVar = inputVar; @@ -78,18 +76,13 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _aTasks = atasks; _aIters = aiters; - //setup matrixblock partition and meta data - _rlen = (int)mc.getRows(); - _clen = (int)mc.getCols(); + //setup matrix block partition meta data + _rlen = (dpf != PDataPartitionFormat.ROW_WISE) ? (int)mc.getRows() : 1; + _clen = (dpf != PDataPartitionFormat.COLUMN_WISE) ? (int)mc.getCols() : 1; _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); _tSparseCol = tSparseCol; _dpf = dpf; - switch( _dpf ) { //create matrix partition for reuse - case ROW_WISE: _rlen = 1; break; - case COLUMN_WISE: _clen = 1; break; - default: throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+dpf); - } } @Override @@ -102,14 +95,14 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3 //process all matrix partitions of this data partition + MatrixBlock partition = null; while( arg0.hasNext() ) { Tuple2<Long,Iterable<Writable>> larg = arg0.next(); //collect input partition (check via equals because oinfo deserialized instance) - MatrixBlock partition = null; if( _oinfo.equals(OutputInfo.BinaryBlockOutputInfo) ) - partition = collectBinaryBlock( larg._2() ); + partition = collectBinaryBlock( larg._2(), partition ); else partition = collectBinaryCellInput( larg._2() ); @@ -178,42 +171,44 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF * will overwrite the result. * * @param valueList iterable writables + * @param reuse matrix block partition for reuse * @return matrix block * @throws IOException if IOException occurs */ - private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList ) + private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList, MatrixBlock reuse ) throws IOException { - MatrixBlock partition = null; + MatrixBlock partition = reuse; try { //reset reuse block, keep configured representation if( _tSparseCol ) partition = new MatrixBlock(_clen, _rlen, true); + else if( partition!=null ) + partition.reset(_rlen, _clen, false); else partition = new MatrixBlock(_rlen, _clen, false); - for( Writable val : valueList ) - { - PairWritableBlock pairValue = (PairWritableBlock) val; - int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen; - int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen; - MatrixBlock block = pairValue.block; + long lnnz = 0; + for( Writable val : valueList ) { + PairWritableBlock pval = (PairWritableBlock) val; + int row_offset = (int)(pval.indexes.getRowIndex()-1)*_brlen; + int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen; if( !partition.isInSparseFormat() ) //DENSE - { - partition.copy( row_offset, row_offset+block.getNumRows()-1, - col_offset, col_offset+block.getNumColumns()-1, - pairValue.block, false ); - } + partition.copy( row_offset, row_offset+pval.block.getNumRows()-1, + col_offset, col_offset+pval.block.getNumColumns()-1, + pval.block, false ); else //SPARSE - { - partition.appendToSparse(pairValue.block, row_offset, col_offset); - } + partition.appendToSparse(pval.block, row_offset, col_offset); + lnnz += pval.block.getNonZeros(); } - //final partition cleanup - cleanupCollectedMatrixPartition( partition, partition.isInSparseFormat() ); + //post-processing: cleanups if required + if( partition.isInSparseFormat() && _clen>_bclen ) + partition.sortSparseRows(); + partition.setNonZeros(lnnz); + partition.examSparsity(); } catch(DMLRuntimeException ex) { @@ -273,29 +268,17 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf); } - //final partition cleanup - cleanupCollectedMatrixPartition(partition, _tSparseCol); - - return partition; - } - - private void cleanupCollectedMatrixPartition(MatrixBlock partition, boolean sort) - throws IOException - { - //sort sparse row contents if required - if( partition.isInSparseFormat() && sort ) - partition.sortSparseRows(); - - //ensure right number of nnz - if( !partition.isInSparseFormat() ) - partition.recomputeNonZeros(); - - //exam and switch dense/sparse representation + //post-processing: cleanups if required try { + if( partition.isInSparseFormat() && _tSparseCol ) + partition.sortSparseRows(); + partition.recomputeNonZeros(); partition.examSparsity(); } - catch(Exception ex){ + catch(DMLRuntimeException ex) { throw new IOException(ex); } + + return partition; } }
