Repository: systemml Updated Branches: refs/heads/master 878430c9f -> 59e9e4d8b
[SYSTEMML-2185,2186] Fix performance sparse sumByKeyStable w/ corr Since SYSTEMML-2038 all dense-to-sparse conversions output sparse blocks in CSR format. This caused performance issues on numerically stable sum by key (as used for all matrix multiplication operators) if the first block aggregation results in sparse aggregate and correction blocks because update-in-place on CSR format requires much more shift operations (of block values instead of all row values in MCSR format). This patch addresses this by ensuring that all incremental aggregate operations are performed over MCSR and by keeping temporary dense blocks in dense format if possible. On stratstats 100K x 1K and 1M x 1K, this patch improved the end-to-end performance from 740s to 523s and from 38,238s to 566s respectively. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/59e9e4d8 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/59e9e4d8 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/59e9e4d8 Branch: refs/heads/master Commit: 59e9e4d8bd82e9d00c1c4e70e41c67e011685956 Parents: 878430c Author: Matthias Boehm <[email protected]> Authored: Fri Mar 16 17:49:27 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Mar 16 17:49:27 2018 -0700 ---------------------------------------------------------------------- .../spark/utils/RDDAggregateUtils.java | 22 ++++-------- .../sysml/runtime/matrix/data/LibMatrixAgg.java | 35 ++++++++++++-------- .../sysml/runtime/matrix/data/MatrixBlock.java | 19 ++++++++--- 3 files changed, 42 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/59e9e4d8/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java index 97870e3..476e93f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -480,29 +480,21 @@ public class RDDAggregateUtils } } - private static class ExtractMatrixBlock implements Function<CorrMatrixBlock, MatrixBlock> - { + private static class ExtractMatrixBlock implements Function<CorrMatrixBlock, MatrixBlock> { private static final long serialVersionUID = 5242158678070843495L; - @Override - public MatrixBlock call(CorrMatrixBlock arg0) - throws Exception - { + public MatrixBlock call(CorrMatrixBlock arg0) throws Exception { + arg0.getValue().examSparsity(); return arg0.getValue(); - } + } } - private static class ExtractDoubleCell implements Function<KahanObject, Double> - { + private static class ExtractDoubleCell implements Function<KahanObject, Double> { private static final long serialVersionUID = -2873241816558275742L; - @Override - public Double call(KahanObject arg0) - throws Exception - { - //return sum and drop correction + public Double call(KahanObject arg0) throws Exception { return arg0._sum; - } + } } /** http://git-wip-us.apache.org/repos/asf/systemml/blob/59e9e4d8/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index 86ed4c4..7fa14bd 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -122,10 +122,16 @@ public class LibMatrixAgg */ public static void aggregateBinaryMatrix(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) throws DMLRuntimeException - { + { //Timing time = new Timing(true); - //boolean saggVal = aggVal.isInSparseFormat(), saggCorr = aggCorr.isInSparseFormat(); - //long naggVal = aggVal.getNonZeros(), naggCorr = aggCorr.getNonZeros(); + //boolean saggVal = aggVal.sparse, saggCorr = aggCorr.sparse; + //long naggVal = aggVal.nonZeros, naggCorr = aggCorr.nonZeros; + + //ensure MCSR instead of CSR for update in-place + if( aggVal.sparse && aggVal.isAllocated() && aggVal.getSparseBlock() instanceof SparseBlockCSR ) + aggVal.sparseBlock = SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, aggVal.getSparseBlock(), true); + if( aggCorr.sparse && aggCorr.isAllocated() && aggCorr.getSparseBlock() instanceof SparseBlockCSR ) + aggCorr.sparseBlock = SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, aggCorr.getSparseBlock(), true); //core aggregation if(!in.sparse && !aggVal.sparse && !aggCorr.sparse) @@ -137,10 +143,8 @@ public class LibMatrixAgg else //if( !in.sparse ) //any aggVal, aggCorr aggregateBinaryMatrixDenseGeneric(in, aggVal, aggCorr); - //System.out.println("agg ("+in.rlen+","+in.clen+","+in.getNonZeros()+","+in.sparse+"), " + - // "("+naggVal+","+saggVal+"), ("+naggCorr+","+saggCorr+") -> " + - // "("+aggVal.getNonZeros()+","+aggVal.isInSparseFormat()+"), ("+aggCorr.getNonZeros()+","+aggCorr.isInSparseFormat()+") " + - // "in "+time.stop()+"ms."); + //System.out.println("agg ("+in.rlen+","+in.clen+","+in.nonZeros+","+in.sparse+"), ("+naggVal+","+saggVal+"), ("+naggCorr+","+saggCorr+") -> " + + // "("+aggVal.nonZeros+","+aggVal.sparse+"), ("+aggCorr.nonZeros+","+aggCorr.sparse+") in "+time.stop()+"ms."); } /** @@ -1011,7 +1015,6 @@ public class LibMatrixAgg KahanObject buffer1 = new KahanObject(0, 0); KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); - final int len = Math.min(a.length, in.rlen*in.clen); int nnzC = 0; @@ -1113,14 +1116,16 @@ public class LibMatrixAgg } } - //note: nnz of aggVal/aggCorr maintained internally - aggVal.examSparsity(); - aggCorr.examSparsity(); + //note: nnz of aggVal/aggCorr maintained internally + if( aggVal.sparse ) + aggVal.examSparsity(false); + if( aggCorr.sparse ) + aggCorr.examSparsity(false); } private static void aggregateBinaryMatrixDenseGeneric(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) throws DMLRuntimeException - { + { if( in.denseBlock==null || in.isEmptyBlock(false) ) return; @@ -1144,8 +1149,10 @@ public class LibMatrixAgg } //note: nnz of aggVal/aggCorr maintained internally - aggVal.examSparsity(); - aggCorr.examSparsity(); + if( aggVal.sparse ) + aggVal.examSparsity(false); + if( aggCorr.sparse ) + aggCorr.examSparsity(false); } private static void aggregateBinaryMatrixLastRowDenseGeneric(MatrixBlock in, MatrixBlock aggVal) http://git-wip-us.apache.org/repos/asf/systemml/blob/59e9e4d8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index bf28b79..78c01fa 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -1003,16 +1003,21 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return evalSparseFormatOnDisk(lrlen, lclen, nonZeros); } + public void examSparsity() throws DMLRuntimeException { + examSparsity(true); + } + /** * Evaluates if this matrix block should be in sparse format in * memory. Depending on the current representation, the state of the * matrix block is changed to the right representation if necessary. * Note that this consumes for the time of execution memory for both - * representations. + * representations. * + * @param allowCSR allow CSR format on dense to sparse conversion * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public void examSparsity() + public void examSparsity(boolean allowCSR) throws DMLRuntimeException { //determine target representation @@ -1027,7 +1032,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( sparse && !sparseDst) sparseToDense(); else if( !sparse && sparseDst ) - denseToSparse(); + denseToSparse(allowCSR); } /** @@ -1081,7 +1086,11 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //////// // basic block handling functions - private void denseToSparse() + private void denseToSparse() { + denseToSparse(true); + } + + private void denseToSparse(boolean allowCSR) { DenseBlock a = getDenseBlock(); @@ -1093,7 +1102,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab final int m = rlen; final int n = clen; - if( nonZeros <= Integer.MAX_VALUE ) { + if( allowCSR && nonZeros <= Integer.MAX_VALUE ) { //allocate target in memory-efficient CSR format int lnnz = (int) nonZeros; int[] rptr = new int[m+1];
