[SYSTEMML-1269] Performance binary block matrix RDD collects

This patch improves the performance and memory efficiency of collecting
binary block matrix RDDs, especially for sparse matrices. In detail,
this includes:

* Matrix output allocation with estimated number of non-zeros, which
impacts the growth rate of sparse rows (2x until ennz, 1.1x afterwards
or if unknown) and hence the number of reallocations. 

* Shallow sparse row copy in case of single column blocks which reduces
the temporary memory consumption and avoid unnecessary sparse row
copies. This shallow copy is safe even if the collect operation does not
copy blocks (e.g., if they come from local memory) because no other
sparse rows will be merged in and any subsequent update in-place
prepares its own variables via a single deep copy. 

* Removed non-zero recomputation to avoid an unnecessary scan over the
output matrix. Each block already carries its nnz and we now simply
incrementally maintain the global nnz. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/201238fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/201238fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/201238fd

Branch: refs/heads/master
Commit: 201238fd33cad976e681226ed5fc99250375450c
Parents: 732e6da
Author: Matthias Boehm <[email protected]>
Authored: Tue Feb 14 23:21:32 2017 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Wed Feb 15 10:49:23 2017 -0800

----------------------------------------------------------------------
 .../context/SparkExecutionContext.java          | 20 +++++++++++++-------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  8 ++++++--
 2 files changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/201238fd/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index d1e521a..66fab1e 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -777,13 +777,14 @@ public class SparkExecutionContext extends 
ExecutionContext
                        //determine target sparse/dense representation
                        long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
                        boolean sparse = 
MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-                                               
+                       
                        //create output matrix block (w/ lazy allocation)
-                       out = new MatrixBlock(rlen, clen, sparse);
+                       out = new MatrixBlock(rlen, clen, sparse, lnnz);
                        
                        List<Tuple2<MatrixIndexes,MatrixBlock>> list = 
rdd.collect();
                        
                        //copy blocks one-at-a-time into output matrix block
+                       long aNnz = 0; 
                        for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
                        {
                                //unpack index-block pair
@@ -796,21 +797,26 @@ public class SparkExecutionContext extends 
ExecutionContext
                                int rows = block.getNumRows();
                                int cols = block.getNumColumns();
                                
+                               //append block
                                if( sparse ) { //SPARSE OUTPUT
-                                       //append block to sparse target in 
order to avoid shifting
-                                       //note: this append requires a final 
sort of sparse rows
-                                       out.appendToSparse(block, row_offset, 
col_offset);
+                                       //append block to sparse target in 
order to avoid shifting, where
+                                       //we use a shallow row copy in case of 
MCSR and single column blocks
+                                       //note: this append requires, for 
multiple column blocks, a final sort 
+                                       out.appendToSparse(block, row_offset, 
col_offset, clen>bclen);
                                }
                                else { //DENSE OUTPUT
                                        out.copy( row_offset, 
row_offset+rows-1, 
                                                          col_offset, 
col_offset+cols-1, block, false );        
                                }
+                               
+                               //incremental maintenance nnz
+                               aNnz += block.getNonZeros();
                        }
                        
                        //post-processing output matrix
-                       if( sparse )
+                       if( sparse && clen>bclen )
                                out.sortSparseRows();
-                       out.recomputeNonZeros();
+                       out.setNonZeros(aNnz);
                        out.examSparsity();
                }
                

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/201238fd/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index a6de5ec..58e2034 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -695,7 +695,11 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
        }
 
-       public void appendToSparse( MatrixBlock that, int rowoffset, int 
coloffset ) 
+       public void appendToSparse( MatrixBlock that, int rowoffset, int 
coloffset ) {
+               appendToSparse(that, rowoffset, coloffset, true);
+       }
+       
+       public void appendToSparse( MatrixBlock that, int rowoffset, int 
coloffset, boolean deep ) 
        {
                if( that==null || that.isEmptyBlock(false) )
                        return; //nothing to append
@@ -713,7 +717,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                        
                                //single block append (avoid re-allocations)
                                if( sparseBlock.isEmpty(aix) && coloffset==0 ) 
{ 
-                                       sparseBlock.set(aix, b.get(i), true);
+                                       sparseBlock.set(aix, b.get(i), deep);
                                }
                                else { //general case
                                        int pos = b.pos(i);

Reply via email to