Repository: systemml
Updated Branches:
  refs/heads/master 878430c9f -> 59e9e4d8b


[SYSTEMML-2185,2186] Fix performance sparse sumByKeyStable w/ corr

Since SYSTEMML-2038 all dense-to-sparse conversions output sparse blocks
in CSR format. This caused performance issues on numerically stable sum
by key (as used for all matrix multiplication operators) if the first
block aggregation results in sparse aggregate and correction blocks
because update-in-place on CSR format requires much more shift
operations (of block values instead of all row values in MCSR format).
This patch addresses this by ensuring that all incremental aggregate
operations are performed over MCSR and by keeping temporary dense blocks
in dense format if possible.

On stratstats 100K x 1K and 1M x 1K, this patch improved the end-to-end
performance from 740s to 523s and from 38,238s to 566s respectively.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/59e9e4d8
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/59e9e4d8
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/59e9e4d8

Branch: refs/heads/master
Commit: 59e9e4d8bd82e9d00c1c4e70e41c67e011685956
Parents: 878430c
Author: Matthias Boehm <[email protected]>
Authored: Fri Mar 16 17:49:27 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Mar 16 17:49:27 2018 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDAggregateUtils.java          | 22 ++++--------
 .../sysml/runtime/matrix/data/LibMatrixAgg.java | 35 ++++++++++++--------
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 19 ++++++++---
 3 files changed, 42 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/59e9e4d8/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
index 97870e3..476e93f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java
@@ -480,29 +480,21 @@ public class RDDAggregateUtils
                }       
        }
 
-       private static class ExtractMatrixBlock implements 
Function<CorrMatrixBlock, MatrixBlock> 
-       {
+       private static class ExtractMatrixBlock implements 
Function<CorrMatrixBlock, MatrixBlock> {
                private static final long serialVersionUID = 
5242158678070843495L;
-
                @Override
-               public MatrixBlock call(CorrMatrixBlock arg0) 
-                       throws Exception 
-               {
+               public MatrixBlock call(CorrMatrixBlock arg0) throws Exception {
+                       arg0.getValue().examSparsity();
                        return arg0.getValue();
-               }       
+               }
        }
 
-       private static class ExtractDoubleCell implements Function<KahanObject, 
Double> 
-       {
+       private static class ExtractDoubleCell implements Function<KahanObject, 
Double> {
                private static final long serialVersionUID = 
-2873241816558275742L;
-
                @Override
-               public Double call(KahanObject arg0) 
-                       throws Exception 
-               {
-                       //return sum and drop correction
+               public Double call(KahanObject arg0) throws Exception {
                        return arg0._sum;
-               }       
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/systemml/blob/59e9e4d8/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
index 86ed4c4..7fa14bd 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java
@@ -122,10 +122,16 @@ public class LibMatrixAgg
         */
        public static void aggregateBinaryMatrix(MatrixBlock in, MatrixBlock 
aggVal, MatrixBlock aggCorr) 
                throws DMLRuntimeException
-       {       
+       {
                //Timing time = new Timing(true);
-               //boolean saggVal = aggVal.isInSparseFormat(), saggCorr = 
aggCorr.isInSparseFormat(); 
-               //long naggVal = aggVal.getNonZeros(), naggCorr = 
aggCorr.getNonZeros();
+               //boolean saggVal = aggVal.sparse, saggCorr = aggCorr.sparse;
+               //long naggVal = aggVal.nonZeros, naggCorr = aggCorr.nonZeros;
+               
+               //ensure MCSR instead of CSR for update in-place
+               if( aggVal.sparse && aggVal.isAllocated() && 
aggVal.getSparseBlock() instanceof SparseBlockCSR )
+                       aggVal.sparseBlock = 
SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, 
aggVal.getSparseBlock(), true);
+               if( aggCorr.sparse && aggCorr.isAllocated() && 
aggCorr.getSparseBlock() instanceof SparseBlockCSR )
+                       aggCorr.sparseBlock = 
SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, 
aggCorr.getSparseBlock(), true);
                
                //core aggregation
                if(!in.sparse && !aggVal.sparse && !aggCorr.sparse)
@@ -137,10 +143,8 @@ public class LibMatrixAgg
                else //if( !in.sparse ) //any aggVal, aggCorr
                        aggregateBinaryMatrixDenseGeneric(in, aggVal, aggCorr);
                
-               //System.out.println("agg 
("+in.rlen+","+in.clen+","+in.getNonZeros()+","+in.sparse+"), " +
-               //                        "("+naggVal+","+saggVal+"), 
("+naggCorr+","+saggCorr+") -> " +
-               //                        
"("+aggVal.getNonZeros()+","+aggVal.isInSparseFormat()+"), 
("+aggCorr.getNonZeros()+","+aggCorr.isInSparseFormat()+") " +
-               //                        "in "+time.stop()+"ms.");
+               //System.out.println("agg 
("+in.rlen+","+in.clen+","+in.nonZeros+","+in.sparse+"), 
("+naggVal+","+saggVal+"), ("+naggCorr+","+saggCorr+") -> " +
+               //      "("+aggVal.nonZeros+","+aggVal.sparse+"), 
("+aggCorr.nonZeros+","+aggCorr.sparse+") in "+time.stop()+"ms.");
        }
        
        /**
@@ -1011,7 +1015,6 @@ public class LibMatrixAgg
                
                KahanObject buffer1 = new KahanObject(0, 0);
                KahanPlus akplus = KahanPlus.getKahanPlusFnObject();
-               
                final int len = Math.min(a.length, in.rlen*in.clen);
                
                int nnzC = 0;
@@ -1113,14 +1116,16 @@ public class LibMatrixAgg
                        }
                }
                
-               //note: nnz of aggVal/aggCorr maintained internally 
-               aggVal.examSparsity();
-               aggCorr.examSparsity(); 
+               //note: nnz of aggVal/aggCorr maintained internally
+               if( aggVal.sparse )
+                       aggVal.examSparsity(false);
+               if( aggCorr.sparse )
+                       aggCorr.examSparsity(false);
        }
 
        private static void aggregateBinaryMatrixDenseGeneric(MatrixBlock in, 
MatrixBlock aggVal, MatrixBlock aggCorr) 
                throws DMLRuntimeException
-       {       
+       {
                if( in.denseBlock==null || in.isEmptyBlock(false) )
                        return;
                
@@ -1144,8 +1149,10 @@ public class LibMatrixAgg
                        }
                
                //note: nnz of aggVal/aggCorr maintained internally 
-               aggVal.examSparsity();
-               aggCorr.examSparsity();
+               if( aggVal.sparse )
+                       aggVal.examSparsity(false);
+               if( aggCorr.sparse )
+                       aggCorr.examSparsity(false);
        }
 
        private static void 
aggregateBinaryMatrixLastRowDenseGeneric(MatrixBlock in, MatrixBlock aggVal) 

http://git-wip-us.apache.org/repos/asf/systemml/blob/59e9e4d8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index bf28b79..78c01fa 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -1003,16 +1003,21 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                return evalSparseFormatOnDisk(lrlen, lclen, nonZeros);
        }
        
+       public void examSparsity() throws DMLRuntimeException {
+               examSparsity(true);
+       }
+       
        /**
         * Evaluates if this matrix block should be in sparse format in
         * memory. Depending on the current representation, the state of the
         * matrix block is changed to the right representation if necessary. 
         * Note that this consumes for the time of execution memory for both 
-        * representations.  
+        * representations.
         * 
+        * @param allowCSR allow CSR format on dense to sparse conversion
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public void examSparsity() 
+       public void examSparsity(boolean allowCSR)
                throws DMLRuntimeException
        {
                //determine target representation
@@ -1027,7 +1032,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                if( sparse && !sparseDst)
                        sparseToDense();
                else if( !sparse && sparseDst )
-                       denseToSparse();
+                       denseToSparse(allowCSR);
        }
        
        /**
@@ -1081,7 +1086,11 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        ////////
        // basic block handling functions
        
-       private void denseToSparse()
+       private void denseToSparse() {
+               denseToSparse(true);
+       }
+       
+       private void denseToSparse(boolean allowCSR)
        {
                DenseBlock a = getDenseBlock();
                
@@ -1093,7 +1102,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                final int m = rlen;
                final int n = clen;
                
-               if( nonZeros <= Integer.MAX_VALUE ) {
+               if( allowCSR && nonZeros <= Integer.MAX_VALUE ) {
                        //allocate target in memory-efficient CSR format
                        int lnnz = (int) nonZeros;
                        int[] rptr = new int[m+1];

Reply via email to