[SYSTEMML-2459] Performance/robustness ultra-sparse self matrix products This patch improves the robustness with regard to OOM and performance of ultra-sparse self matrix multiplications as common over graph datasets. Specifically, we now have a special case for ultra-sparse self products based on temporary dense and sparse row aggregation, as well as related logic to select appropriate sparse outputs.
In addition, this patch includes minor improvements of the decision of multi-threaded operations (guard against long overflows) and an extension of sparse blocks for in-place addition (to avoid unnecessary binary search operations) and sparse row compaction. With this patch, the following two scenario now properly run even in singlenode setups: a) germany_osm (11M x 11M, 24M non-zeros) * baseline: >1h on ultra-sparse w/ sparse output * new: 1.9s b) uk-2005 (39M x 39M, 936M non-zeros) * baseline: OOM on sparse-sparse w/ dense output * new: 984s (still dominated by garbage collection) Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/59a0bb26 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/59a0bb26 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/59a0bb26 Branch: refs/heads/master Commit: 59a0bb264f87b5fd352f72915d0f6cf207ebd259 Parents: aca62ba Author: Matthias Boehm <mboe...@gmail.com> Authored: Sat Jul 21 01:14:21 2018 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sat Jul 21 01:14:21 2018 -0700 ---------------------------------------------------------------------- .../runtime/matrix/data/LibMatrixMult.java | 207 ++++++++++++------- .../sysml/runtime/matrix/data/SparseBlock.java | 22 +- .../runtime/matrix/data/SparseBlockCOO.java | 10 + .../runtime/matrix/data/SparseBlockCSR.java | 31 +++ .../runtime/matrix/data/SparseBlockMCSR.java | 19 ++ .../sysml/runtime/matrix/data/SparseRow.java | 10 + .../runtime/matrix/data/SparseRowScalar.java | 5 + .../runtime/matrix/data/SparseRowVector.java | 22 ++ .../sysml/runtime/util/UtilFunctions.java | 8 + 9 files changed, 259 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java index 1b7b8f4..8f60003 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java @@ -1507,92 +1507,150 @@ public class LibMatrixMult final boolean leftUS = m1.isUltraSparse() || (m1.isUltraSparse(false) && !m2.isUltraSparse()) || (m1.sparse && !m2.sparse); + + if( m1 == m2 ) //self-product + matrixMultUltraSparseSelf(m1, ret, rl, ru); + else if( leftUS ) + matrixMultUltraSparseLeft(m1, m2, ret, rl, ru); + else + matrixMultUltraSparseRight(m1, m2, ret, rl, ru); + //no need to recompute nonzeros because maintained internally + } + + private static void matrixMultUltraSparseSelf(MatrixBlock m1, MatrixBlock ret, int rl, int ru) { + //common use case: self product G %*% G of graph resulting in dense but still sparse output + int n = m1.clen; //m2.clen + SparseBlock a = m1.sparseBlock; + SparseBlock c = ret.sparseBlock; + double[] tmp = null; + + //IKJ with dense working row for lhs nnz/row > threshold + for( int i=rl; i<ru; i++ ) { + if( a.isEmpty(i) ) continue; + int alen = a.size(i); + int apos = a.pos(i); + int[] aix = a.indexes(i); + double[] avals = a.values(i); + + //compute number of aggregated non-zeros for input row + int nnz1 = (int) Math.min(UtilFunctions.computeNnz(a, aix, apos, alen), n); + boolean ldense = nnz1 > n / 128; + + //perform vector-matrix multiply w/ dense or sparse output + if( ldense ) { //init dense tmp row + tmp = (tmp == null) ? new double[n] : tmp; + Arrays.fill(tmp, 0); + } + for( int k=apos; k<apos+alen; k++ ) { + if( a.isEmpty(aix[k]) ) continue; + int blen = a.size(aix[k]); + int bpos = a.pos(aix[k]); + int[] bix = a.indexes(aix[k]); + double aval = avals[k]; + double[] bvals = a.values(aix[k]); + if( ldense ) { //dense aggregation + for( int j=bpos; j<bpos+blen; j++ ) + tmp[bix[j]] += aval * bvals[j]; + } + else { //sparse aggregation + c.allocate(i, nnz1); + for( int j=bpos; j<bpos+blen; j++ ) + c.add(i, bix[j], aval * bvals[j]); + c.compact(i); //conditional compaction + } + } + if( ldense ) { //copy dense tmp row + int nnz2 = UtilFunctions.computeNnz(tmp, 0, n); + c.allocate(i, nnz2); //avoid reallocation + for( int j=0; j<n; j++ ) + c.append(i, j, tmp[j]); + } + } + //recompute non-zero for single-threaded + if( rl == 0 && ru == m1.rlen ) + ret.recomputeNonZeros(); + } + + private static void matrixMultUltraSparseLeft(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int rl, int ru) { final int m = m1.rlen; - final int cd = m1.clen; final int n = m2.clen; - if( leftUS ) //left is ultra-sparse (IKJ) - { - SparseBlock a = m1.sparseBlock; - SparseBlock c = ret.sparseBlock; - boolean rightSparse = m2.sparse; - - for( int i=rl; i<ru; i++ ) - { - if( a.isEmpty(i) ) continue; - int apos = a.pos(i); - int alen = a.size(i); - int[] aixs = a.indexes(i); - double[] avals = a.values(i); - - if( alen==1 ) { - //row selection (now aggregation) with potential scaling - int aix = aixs[apos]; - int lnnz = 0; - if( rightSparse ) { //sparse right matrix (full row copy) - if( !m2.sparseBlock.isEmpty(aix) ) { - ret.rlen=m; - ret.allocateSparseRowsBlock(false); //allocation on demand - boolean ldeep = (m2.sparseBlock instanceof SparseBlockMCSR); - ret.sparseBlock.set(i, m2.sparseBlock.get(aix), ldeep); - ret.nonZeros += (lnnz = ret.sparseBlock.size(i)); - } + //left is ultra-sparse (IKJ) + SparseBlock a = m1.sparseBlock; + SparseBlock c = ret.sparseBlock; + boolean rightSparse = m2.sparse; + + for( int i=rl; i<ru; i++ ) { + if( a.isEmpty(i) ) continue; + int apos = a.pos(i); + int alen = a.size(i); + int[] aixs = a.indexes(i); + double[] avals = a.values(i); + if( alen==1 ) { + //row selection (now aggregation) with potential scaling + int aix = aixs[apos]; + int lnnz = 0; + if( rightSparse ) { //sparse right matrix (full row copy) + if( !m2.sparseBlock.isEmpty(aix) ) { + ret.rlen=m; + ret.allocateSparseRowsBlock(false); //allocation on demand + boolean ldeep = (m2.sparseBlock instanceof SparseBlockMCSR); + ret.sparseBlock.set(i, m2.sparseBlock.get(aix), ldeep); + ret.nonZeros += (lnnz = ret.sparseBlock.size(i)); } - else { //dense right matrix (append all values) - lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1); - if( lnnz > 0 ) { - c.allocate(i, lnnz); //allocate once - double[] bvals = m2.getDenseBlock().values(aix); - for( int j=0, bix=m2.getDenseBlock().pos(aix); j<n; j++ ) - c.append(i, j, bvals[bix+j]); - ret.nonZeros += lnnz; - } + } + else { //dense right matrix (append all values) + lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1); + if( lnnz > 0 ) { + c.allocate(i, lnnz); //allocate once + double[] bvals = m2.getDenseBlock().values(aix); + for( int j=0, bix=m2.getDenseBlock().pos(aix); j<n; j++ ) + c.append(i, j, bvals[bix+j]); + ret.nonZeros += lnnz; } - //optional scaling if not pure selection - if( avals[apos] != 1 && lnnz > 0 ) - vectMultiplyInPlace(avals[apos], c.values(i), c.pos(i), c.size(i)); } - else //GENERAL CASE - { - for( int k=apos; k<apos+alen; k++ ) - { - double aval = avals[k]; - int aix = aixs[k]; - for( int j=0; j<n; j++ ) - { - double cval = ret.quickGetValue(i, j); - double cvald = aval*m2.quickGetValue(aix, j); - if( cvald != 0 ) - ret.quickSetValue(i, j, cval+cvald); - } + //optional scaling if not pure selection + if( avals[apos] != 1 && lnnz > 0 ) + vectMultiplyInPlace(avals[apos], c.values(i), c.pos(i), c.size(i)); + } + else { //GENERAL CASE + for( int k=apos; k<apos+alen; k++ ) { + double aval = avals[k]; + int aix = aixs[k]; + for( int j=0; j<n; j++ ) { + double cval = ret.quickGetValue(i, j); + double cvald = aval*m2.quickGetValue(aix, j); + if( cvald != 0 ) + ret.quickSetValue(i, j, cval+cvald); } } } } - else //right is ultra-sparse (KJI) - { - SparseBlock b = m2.sparseBlock; - - for(int k = 0; k < cd; k++ ) { - if( b.isEmpty(k) ) continue; - int bpos = b.pos(k); - int blen = b.size(k); - int[] bixs = b.indexes(k); - double[] bvals = b.values(k); - for( int j=bpos; j<bpos+blen; j++ ) { - double bval = bvals[j]; - int bix = bixs[j]; - for( int i=rl; i<ru; i++ ) { - double cvald = bval*m1.quickGetValue(i, k); - if( cvald != 0 ){ - double cval = ret.quickGetValue(i, bix); - ret.quickSetValue(i, bix, cval+cvald); - } + } + + private static void matrixMultUltraSparseRight(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int rl, int ru) { + final int cd = m1.clen; + + //right is ultra-sparse (KJI) + SparseBlock b = m2.sparseBlock; + for(int k = 0; k < cd; k++ ) { + if( b.isEmpty(k) ) continue; + int bpos = b.pos(k); + int blen = b.size(k); + int[] bixs = b.indexes(k); + double[] bvals = b.values(k); + for( int j=bpos; j<bpos+blen; j++ ) { + double bval = bvals[j]; + int bix = bixs[j]; + for( int i=rl; i<ru; i++ ) { + double cvald = bval*m1.quickGetValue(i, k); + if( cvald != 0 ){ + double cval = ret.quickGetValue(i, bix); + ret.quickSetValue(i, bix, cval+cvald); } } } } - //no need to recompute nonzeros because maintained internally } private static void matrixMultChainDense(MatrixBlock mX, MatrixBlock mV, MatrixBlock mW, MatrixBlock ret, ChainType ct, int rl, int ru) @@ -3755,7 +3813,9 @@ public class LibMatrixMult boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k); return k > 1 && LOW_LEVEL_OPTIMIZATION && (!checkMem || 8L * m2.clen * k < MEM_OVERHEAD_THRESHOLD) - && (!checkFLOPs || FPfactor * m1.rlen * m1.clen * m2.clen > + //note: cast to double to avoid long overflows on ultra-sparse matrices + //due to FLOP computation based on number of cells not non-zeros + && (!checkFLOPs || (double)FPfactor * m1.rlen * m1.clen * m2.clen > (sharedTP ? PAR_MINFLOP_THRESHOLD2 : PAR_MINFLOP_THRESHOLD1)); } @@ -3775,6 +3835,7 @@ public class LibMatrixMult double outSp = OptimizerUtils.getMatMultSparsity( m1.getSparsity(), m2.getSparsity(), m1.rlen, m1.clen, m2.clen, true); return (m1.isUltraSparse() || m2.isUltraSparse()) //base case + || (m1.isUltraSparse(false) && m1 == m2) //ultra-sparse self product || (m1.isUltraSparsePermutationMatrix() && OptimizerUtils.getSparsity(m2.rlen, m2.clen, m2.nonZeros)<1.0) || ((m1.isUltraSparse(false) || m2.isUltraSparse(false)) http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java index 7579029..16d84e6 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java @@ -80,6 +80,13 @@ public abstract class SparseBlock implements Serializable */ public abstract void allocate(int r, int ennz, int maxnnz); + /** + * Re-allocate physical row if physical size exceeds + * logical size plus resize factor. + * + * @param r row index + */ + public abstract void compact(int r); //////////////////////// //obtain basic meta data @@ -306,7 +313,7 @@ public abstract class SparseBlock implements Serializable * @param r row index starting at 0 * @param c column index starting at 0 * @param v zero or non-zero value - * @return ? + * @return true, if number of non-zeros changed */ public abstract boolean set(int r, int c, double v); @@ -324,10 +331,21 @@ public abstract class SparseBlock implements Serializable public abstract void set(int r, SparseRow row, boolean deep); /** + * Add a value to a matrix cell (r,c). This might update an existing + * non-zero value, or insert a new non-zero value. + * + * @param r row index starting at 0 + * @param c column index starting at 0 + * @param v zero or non-zero value + * @return true, if number of non-zeros changed + */ + public abstract boolean add(int r, int c, double v); + + /** * Append a value to the end of the physical representation. This should * only be used for operations with sequential write pattern or if followed * by a sort() operation. Note that this operation does not perform any - * matrix cell updates. + * matrix cell updates. * * @param r row index starting at 0 * @param c column index starting at 0 http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java index 1c8e3fe..31fb6bf 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java @@ -171,6 +171,11 @@ public class SparseBlockCOO extends SparseBlock public void allocate(int r, int ennz, int maxnnz) { //do nothing everything preallocated } + + @Override + public void compact(int r) { + //do nothing everything preallocated + } @Override public int numRows() { @@ -380,6 +385,11 @@ public class SparseBlockCOO extends SparseBlock System.arraycopy(aix, 0, _cindexes, pos, alen); System.arraycopy(avals, 0, _values, pos, alen); } + + @Override + public boolean add(int r, int c, double v) { + return set(r, c, get(r, c) + v); + } @Override public void append(int r, int c, double v) { http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java index 1365f95..0d06639 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java @@ -322,6 +322,11 @@ public class SparseBlockCSR extends SparseBlock public void allocate(int r, int ennz, int maxnnz) { //do nothing everything preallocated } + + @Override + public void compact(int r) { + //do nothing everything preallocated + } @Override public int numRows() { @@ -453,6 +458,32 @@ public class SparseBlockCSR extends SparseBlock incrPtr(r+1); return true; // nnz++ } + + @Override + public boolean add(int r, int c, double v) { + //early abort on zero + if( v==0 ) return false; + + int pos = pos(r); + int len = size(r); + + //search for existing col index + int index = Arrays.binarySearch(_indexes, pos, pos+len, c); + if( index >= 0 ) { + //add to existing value + _values[index] += v; + return false; + } + + //insert new index-value pair + index = Math.abs( index+1 ); + if( _size==_values.length ) + resizeAndInsert(index, c, v); + else + shiftRightAndInsert(index, c, v); + incrPtr(r+1); + return true; // nnz++ + } @Override public void set(int r, SparseRow row, boolean deep) { http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java index 4cbf49a..840d034 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java @@ -139,6 +139,16 @@ public class SparseBlockMCSR extends SparseBlock } @Override + public void compact(int r) { + if( isAllocated(r) && _rows[r] instanceof SparseRowVector + && _rows[r].size() > SparseBlock.INIT_CAPACITY + && _rows[r].size() * SparseBlock.RESIZE_FACTOR1 + < ((SparseRowVector)_rows[r]).capacity() ) { + ((SparseRowVector)_rows[r]).compact(); + } + } + + @Override public int numRows() { return _rows.length; } @@ -296,6 +306,15 @@ public class SparseBlockMCSR extends SparseBlock } @Override + public boolean add(int r, int c, double v) { + if( !isAllocated(r) ) + _rows[r] = new SparseRowScalar(); + else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty()) + _rows[r] = new SparseRowVector(_rows[r]); + return _rows[r].add(c, v); + } + + @Override public void append(int r, int c, double v) { if( !isAllocated(r) ) _rows[r] = new SparseRowScalar(); http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRow.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRow.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRow.java index 2ce2d87..e967c4d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRow.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRow.java @@ -82,6 +82,16 @@ public abstract class SparseRow implements Serializable public abstract boolean set(int col, double v); /** + * Add a value to a specified column with awareness of + * potential insertions. + * + * @param col column index, zero-based + * @param v value + * @return true if the size of the sparse row changed + */ + public abstract boolean add(int col, double v); + + /** * Appends a value to the end of the sparse row. * * @param col column index, zero-based http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowScalar.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowScalar.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowScalar.java index fa95e4c..a1e09cd 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowScalar.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowScalar.java @@ -75,6 +75,11 @@ public final class SparseRowScalar extends SparseRow implements Serializable value = v; return ret; } + + @Override + public boolean add(int col, double v) { + return set(col, v + get(col)); + } @Override public void append(int col, double v) { http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java index 696f417..50acc28 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java @@ -185,6 +185,28 @@ public final class SparseRowVector extends SparseRow implements Serializable shiftRightAndInsert(index, col, v); return true; // nnz++ } + + @Override + public boolean add(int col, double v) { + //early abort on zero (if no overwrite) + if( v==0.0 ) return false; + + //search for existing col index + int index = Arrays.binarySearch(indexes, 0, size, col); + if( index >= 0 ) { + //add to existing values + values[index] += v; + return false; + } + + //insert new index-value pair + index = Math.abs( index+1 ); + if( size==values.length ) + resizeAndInsert(index, col, v); + else + shiftRightAndInsert(index, col, v); + return true; // nnz++ + } @Override public void append(int col, double v) { http://git-wip-us.apache.org/repos/asf/systemml/blob/59a0bb26/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 58dde27..42b519b 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -35,6 +35,7 @@ import org.apache.sysml.runtime.matrix.MetaDataNumItemsByEachReducer; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; public class UtilFunctions @@ -608,6 +609,13 @@ public class UtilFunctions lnnz += (a[i] != 0) ? 1 : 0; return lnnz; } + + public static long computeNnz(SparseBlock a, int[] aix, int ai, int alen) { + long lnnz = 0; + for( int k=ai; k<ai+alen; k++ ) + lnnz += a.size(aix[k]); + return lnnz; + } public static ValueType[] nCopies(int n, ValueType vt) { ValueType[] ret = new ValueType[n];