[SYSTEMML-1269] Performance binary block matrix RDD collects This patch improves the performance and memory efficiency of collecting binary block matrix RDDs, especially for sparse matrices. In detail, this includes:
* Matrix output allocation with estimated number of non-zeros, which impacts the growth rate of sparse rows (2x until ennz, 1.1x afterwards or if unknown) and hence the number of reallocations. * Shallow sparse row copy in case of single column blocks which reduces the temporary memory consumption and avoid unnecessary sparse row copies. This shallow copy is safe even if the collect operation does not copy blocks (e.g., if they come from local memory) because no other sparse rows will be merged in and any subsequent update in-place prepares its own variables via a single deep copy. * Removed non-zero recomputation to avoid an unnecessary scan over the output matrix. Each block already carries its nnz and we now simply incrementally maintain the global nnz. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/201238fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/201238fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/201238fd Branch: refs/heads/master Commit: 201238fd33cad976e681226ed5fc99250375450c Parents: 732e6da Author: Matthias Boehm <[email protected]> Authored: Tue Feb 14 23:21:32 2017 -0800 Committer: Matthias Boehm <[email protected]> Committed: Wed Feb 15 10:49:23 2017 -0800 ---------------------------------------------------------------------- .../context/SparkExecutionContext.java | 20 +++++++++++++------- .../sysml/runtime/matrix/data/MatrixBlock.java | 8 ++++++-- 2 files changed, 19 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/201238fd/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java index d1e521a..66fab1e 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java @@ -777,13 +777,14 @@ public class SparkExecutionContext extends ExecutionContext //determine target sparse/dense representation long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); - + //create output matrix block (w/ lazy allocation) - out = new MatrixBlock(rlen, clen, sparse); + out = new MatrixBlock(rlen, clen, sparse, lnnz); List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); //copy blocks one-at-a-time into output matrix block + long aNnz = 0; for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list ) { //unpack index-block pair @@ -796,21 +797,26 @@ public class SparkExecutionContext extends ExecutionContext int rows = block.getNumRows(); int cols = block.getNumColumns(); + //append block if( sparse ) { //SPARSE OUTPUT - //append block to sparse target in order to avoid shifting - //note: this append requires a final sort of sparse rows - out.appendToSparse(block, row_offset, col_offset); + //append block to sparse target in order to avoid shifting, where + //we use a shallow row copy in case of MCSR and single column blocks + //note: this append requires, for multiple column blocks, a final sort + out.appendToSparse(block, row_offset, col_offset, clen>bclen); } else { //DENSE OUTPUT out.copy( row_offset, row_offset+rows-1, col_offset, col_offset+cols-1, block, false ); } + + //incremental maintenance nnz + aNnz += block.getNonZeros(); } //post-processing output matrix - if( sparse ) + if( sparse && clen>bclen ) out.sortSparseRows(); - out.recomputeNonZeros(); + out.setNonZeros(aNnz); out.examSparsity(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/201238fd/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index a6de5ec..58e2034 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -695,7 +695,11 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } - public void appendToSparse( MatrixBlock that, int rowoffset, int coloffset ) + public void appendToSparse( MatrixBlock that, int rowoffset, int coloffset ) { + appendToSparse(that, rowoffset, coloffset, true); + } + + public void appendToSparse( MatrixBlock that, int rowoffset, int coloffset, boolean deep ) { if( that==null || that.isEmptyBlock(false) ) return; //nothing to append @@ -713,7 +717,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //single block append (avoid re-allocations) if( sparseBlock.isEmpty(aix) && coloffset==0 ) { - sparseBlock.set(aix, b.get(i), true); + sparseBlock.set(aix, b.get(i), deep); } else { //general case int pos = b.pos(i);
