[SYSTEMML-1753] Fix memory efficiency parfor in-memory result merge This patch fixes out-of-memory issues with parfor in-memory result merge for moderately large outputs. In case of result merge without compare (i.e., initially empty output matrix), we currently allocate the output in sparse, collect all outputs into this sparse representation and finally convert it into a dense output. This is unnecessarily inefficient in terms of memory requirements (dense matrix in sparse format and dense output) and performance because for result merge without compare, the output number of non-zeros is exactly know by the sum of the number of non-zeros of all inputs. We now use a much better estimate of the number of non-zeros, which improves the memory requirements by more than 2x in above scenario. Furthermore, we now also remove any remaining intermediates (other than result variables) from the workers before result merge to reduce memory pressure in scenarios without buffer pool eviction (e.g., training through JMLC).
-> remove non-result variables before result merge. fix2 Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/73f7c6a7 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/73f7c6a7 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/73f7c6a7 Branch: refs/heads/master Commit: 73f7c6a7776a46435042fba3784af3d2368f7703 Parents: 8b83ab5 Author: Matthias Boehm <[email protected]> Authored: Sat Jul 8 18:31:28 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Jul 8 22:32:04 2017 -0700 ---------------------------------------------------------------------- .../controlprogram/LocalVariableMap.java | 14 +++--- .../controlprogram/ParForProgramBlock.java | 12 ++--- .../parfor/ResultMergeLocalMemory.java | 50 ++++++++++++-------- .../sysml/runtime/matrix/data/MatrixBlock.java | 4 +- 4 files changed, 47 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java index 63757ac..d60ef24 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java @@ -78,20 +78,22 @@ public class LocalVariableMap implements Cloneable * @param name the variable name for the data value * @param val the data value object (such as envelope) */ - public void put(String name, Data val) - { + public void put(String name, Data val) { localMap.put( name, val ); } - public Data remove( String name ) - { + public Data remove( String name ) { return localMap.remove( name ); } - public void removeAll() - { + public void removeAll() { localMap.clear(); } + + public void removeAllNotIn(Set<String> blacklist) { + localMap.entrySet().removeIf( + e -> !blacklist.contains(e.getKey())); + } public boolean hasReferences( Data d ) { http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index 32f105b..a2d361c 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -802,17 +802,17 @@ public class ParForProgramBlock extends ForProgramBlock // Step 4) collecting results from each parallel worker - //obtain results + //obtain results and cleanup other intermediates before result merge LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; - for( int i=0; i<_numThreads; i++ ) - { + for( int i=0; i<_numThreads; i++ ) { localVariables[i] = workers[i].getVariables(); + localVariables[i].removeAllNotIn(new HashSet<String>(_resultVars)); numExecutedTasks += workers[i].getExecutedTasks(); numExecutedIterations += workers[i].getExecutedIterations(); } //consolidate results into global symbol table - consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, - localVariables ); + consolidateAndCheckResults( ec, numIterations, numCreatedTasks, + numExecutedIterations, numExecutedTasks, localVariables ); // Step 5) cleanup local parworkers (e.g., remove created functions) for( int i=0; i<_numThreads; i++ ) @@ -1734,7 +1734,7 @@ public class ParForProgramBlock extends ForProgramBlock { //execute result merge sequentially for all result vars for( String var : _resultVars ) //foreach non-local write - { + { Data dat = ec.getVariable(var); if( dat instanceof MatrixObject ) //robustness scalars { http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java index a1962e4..fdd3d97 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java @@ -61,14 +61,16 @@ public class ResultMergeLocalMemory extends ResultMerge try { - //get matrix blocks through caching + //get old output matrix from cache for compare MatrixBlock outMB = _output.acquireRead(); - //get old output matrix from cache for compare - int estnnz = outMB.getNumRows()*outMB.getNumColumns(); - MatrixBlock outMBNew = new MatrixBlock(outMB.getNumRows(), outMB.getNumColumns(), - outMB.isInSparseFormat(), estnnz); + //create output matrices in correct format according to + //the estimated number of non-zeros + long estnnz = getOutputNnzEstimate(); + MatrixBlock outMBNew = new MatrixBlock( + outMB.getNumRows(), outMB.getNumColumns(), estnnz); boolean appendOnly = outMBNew.isInSparseFormat(); + outMBNew.allocateDenseOrSparseBlock(); //create compare matrix if required (existing data in result) _compare = createCompareMatrix(outMB); @@ -80,7 +82,7 @@ public class ResultMergeLocalMemory extends ResultMerge for( MatrixObject in : _inputs ) { //check for empty inputs (no iterations executed) - if( in !=null && in != _output ) + if( in != null && in != _output ) { LOG.trace("ResultMerge (local, in-memory): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")"); @@ -97,7 +99,7 @@ public class ResultMergeLocalMemory extends ResultMerge //determine need for sparse2dense change during merge boolean sparseToDense = appendOnly && !MatrixBlock.evalSparseFormatInMemory( - outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); + outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); if( sparseToDense ) { outMBNew.sortSparseRows(); //sort sparse due to append-only outMBNew.examSparsity(); //sparse-dense representation change @@ -129,8 +131,7 @@ public class ResultMergeLocalMemory extends ResultMerge //release old output, and all inputs _output.release(); } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } @@ -144,13 +145,9 @@ public class ResultMergeLocalMemory extends ResultMerge throws DMLRuntimeException { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) - - //Timing time = null; + LOG.trace("ResultMerge (local, in-memory): Execute parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")"); - // time = new Timing(); - // time.start(); - try { //get matrix blocks through caching @@ -211,10 +208,8 @@ public class ResultMergeLocalMemory extends ResultMerge //release old output, and all inputs _output.release(); - //_output.clearData(); //save, since it respects pin/unpin } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } @@ -228,8 +223,7 @@ public class ResultMergeLocalMemory extends ResultMerge double[][] ret = null; //create compare matrix only if required - if( output.getNonZeros() > 0 ) - { + if( output.getNonZeros() > 0 ) { ret = DataConverter.convertToDoubleMatrix( output ); } @@ -289,6 +283,24 @@ public class ResultMergeLocalMemory extends ResultMerge mergeWithComp(out, in, _compare); } + /** + * Estimates the number of non-zeros in the final merged output. + * For scenarios without compare matrix, this is the exact number + * of non-zeros due to guaranteed disjoint results per worker. + * + * @return estimated number of non-zeros. + */ + private long getOutputNnzEstimate() { + long nnzInputs = 0; + for( MatrixObject input : _inputs ) + if( input != null ) + nnzInputs += Math.max(input.getNnz(),1); + long rlen = _output.getNumRows(); + long clen = _output.getNumColumns(); + return Math.min(rlen * clen, + Math.max(nnzInputs, _output.getNnz())); + } + /** * NOTE: only used if matrix in dense http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 43adfc6..16051dc 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -1033,7 +1033,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab * @return true if matrix block shold be in sparse format in memory */ public static boolean evalSparseFormatInMemory( final long nrows, final long ncols, final long nnz ) - { + { //evaluate sparsity threshold double lsparsity = (double)nnz/nrows/ncols; boolean lsparse = (lsparsity < SPARSITY_TURN_POINT); @@ -1686,7 +1686,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab throw new DMLRuntimeException("Number of non-zeros mismatch on merge disjoint (target="+rlen+"x"+clen+", nnz target="+nonZeros+", nnz source="+that.nonZeros+")"); //check for empty target (copy in full) - if( isEmptyBlock(false) ) { + if( isEmptyBlock(false) && !(!sparse && isAllocated()) ) { copy(that); return; }
