This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new fac955c757 [SYSTEMDS-3653] Ultra Sparse MM Optimization fac955c757 is described below commit fac955c757015fa6c2c187725f36afbaec6ee6f6 Author: Sebastian Baunsgaard <baunsga...@apache.org> AuthorDate: Thu Nov 30 10:31:32 2023 +0100 [SYSTEMDS-3653] Ultra Sparse MM Optimization This commit update the left side ultra sparse matrix multiplication to remove indirections and optimize JIT compilation. We see improvements of up to 9x in small examples. Left side one non zero per row 100k by 1m %% 1m by 100 sp 0.1 -> Before: 6.5 After : 4.5 sec Left side two non zero per row 200k by 1m %% 1m by 100 sp 0.1 -> Before 173.724 After : 19.5 sec Left side one non zero per row 100k by 1m %% 1m by 100 sp 0.43 -> Before: 65.06 After : 29.039 sec Closes #1951 --- .../runtime/compress/CompressedMatrixBlock.java | 9 +- .../apache/sysds/runtime/data/SparseBlockMCSR.java | 2 +- .../matrix/data/LibMatrixDenseToSparse.java | 160 +++++++++++------ .../sysds/runtime/matrix/data/LibMatrixMult.java | 198 +++++++++++++++------ .../matrix/data/LibMatrixSparseToDense.java | 184 +++++++------------ .../sysds/runtime/matrix/data/MatrixBlock.java | 93 +++++----- 6 files changed, 369 insertions(+), 277 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 564037cb48..92200d4384 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -1152,12 +1152,17 @@ public class CompressedMatrixBlock extends MatrixBlock { } @Override - public void examSparsity(boolean allowCSR) { + public void examSparsity(boolean allowCSR, int k) { // do nothing } @Override - public void sparseToDense() { + public void sparseToDense(int k) { + // do nothing + } + + @Override + public void denseToSparse(boolean allowCSR, int k){ // do nothing } diff --git a/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java b/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java index e889d58b68..08dbc8b0a4 100644 --- a/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java +++ b/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java @@ -291,7 +291,7 @@ public class SparseBlockMCSR extends SparseBlock @Override public final boolean isEmpty(int r) { - return !isAllocated(r) || _rows[r].isEmpty(); + return _rows[r] == null || _rows[r].isEmpty(); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixDenseToSparse.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixDenseToSparse.java index 5280aa5f9b..7c687578d0 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixDenseToSparse.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixDenseToSparse.java @@ -26,7 +26,6 @@ import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlockCSR; import org.apache.sysds.runtime.data.SparseBlockMCSR; @@ -44,6 +43,10 @@ public interface LibMatrixDenseToSparse { * @param allowCSR If CSR is allowed. */ public static void denseToSparse(MatrixBlock r, boolean allowCSR) { + denseToSparse(r, allowCSR, 1); + } + + public static void denseToSparse(MatrixBlock r, boolean allowCSR, int k) { final DenseBlock a = r.getDenseBlock(); // set target representation, early abort on empty blocks @@ -51,12 +54,10 @@ public interface LibMatrixDenseToSparse { if(a == null) return; - final int k = InfrastructureAnalyzer.getLocalParallelism(); - - if(k > 1 && r.getNumRows() > 1000) + if(k > 1 && r.getSparsity() > 0.01 && (r.rlen > 100 || ((long) r.rlen * r.clen > 100000))) denseToSparseParallel(r, k, allowCSR); else if(allowCSR && r.nonZeros <= Integer.MAX_VALUE) - denseToSparseCSR(r); + denseToSparseCSRSafe(r); else denseToSparseMCSR(r); @@ -64,30 +65,12 @@ public interface LibMatrixDenseToSparse { r.denseBlock = null; } - private static void denseToSparseCSR(MatrixBlock r) { - final DenseBlock a = r.getDenseBlock(); - final int m = r.rlen; - final int n = r.clen; + private static void denseToSparseCSRSafe(MatrixBlock r) { try { - // allocate target in memory-efficient CSR format - int lnnz = (int) r.nonZeros; - int[] rptr = new int[m + 1]; - int[] indexes = new int[lnnz]; - double[] values = new double[lnnz]; - for(int i = 0, pos = 0; i < m; i++) { - double[] avals = a.values(i); - int aix = a.pos(i); - for(int j = 0; j < n; j++) { - double aval = avals[aix + j]; - if(aval != 0) { - indexes[pos] = j; - values[pos] = aval; - pos++; - } - } - rptr[i + 1] = pos; - } - r.sparseBlock = new SparseBlockCSR(rptr, indexes, values, lnnz); + if(r.getNumRows() == 1) + denseToSparseCSRSingleRow(r); + else + denseToSparseCSR(r); } catch(ArrayIndexOutOfBoundsException ioobe) { r.sparse = false; @@ -104,6 +87,49 @@ public interface LibMatrixDenseToSparse { } } + private static void denseToSparseCSRSingleRow(MatrixBlock r) { + final DenseBlock a = r.getDenseBlock(); + final int n = r.clen; + final int lnnz = (int) r.nonZeros; + final int[] indexes = new int[lnnz]; + final double[] values = new double[lnnz]; + final double[] avals = a.values(0); + addColCSR(avals, n, 0, indexes, values, 0); + final int[] rptr = new int[2]; + rptr[1] = lnnz; + r.sparseBlock = new SparseBlockCSR(rptr, indexes, values, lnnz); + } + + private static void denseToSparseCSR(MatrixBlock r) { + final DenseBlock a = r.getDenseBlock(); + final int m = r.rlen; + final int n = r.clen; + // allocate target in memory-efficient CSR format + final int lnnz = (int) r.nonZeros; + final int[] rptr = new int[m + 1]; + final int[] indexes = new int[lnnz]; + final double[] values = new double[lnnz]; + for(int i = 0, pos = 0; i < m; i++) { + final double[] avals = a.values(i); + final int aix = a.pos(i); + pos = addColCSR(avals, n, aix, indexes, values, pos); + rptr[i + 1] = pos; + } + r.sparseBlock = new SparseBlockCSR(rptr, indexes, values, lnnz); + } + + private static int addColCSR(double[] avals, int n, int aix, int[] indexes, double[] values, int pos) { + for(int j = 0; j < n; j++) { + final double aval = avals[aix + j]; + if(aval != 0) { + indexes[pos] = j; + values[pos] = aval; + pos++; + } + } + return pos; + } + private static void denseToSparseMCSR(MatrixBlock r) { final DenseBlock a = r.getDenseBlock(); @@ -116,62 +142,92 @@ public interface LibMatrixDenseToSparse { if(!r.allocateSparseRowsBlock()) r.reset(); // reset if not allocated SparseBlockMCSR sblock = (SparseBlockMCSR) r.sparseBlock; - toSparseMCSRRange(a, sblock, n, 0, m); + toSparseMCSRRangeScan(a, sblock, n, 0, m); r.nonZeros = nnzTemp; } - private static void toSparseMCSRRange(DenseBlock a, SparseBlockMCSR b, int n, int rl, int ru) { + private static void toSparseMCSRRangeNoScan(DenseBlock a, SparseBlockMCSR b, int n, int rl, int ru, int est) { + for(int i = rl; i < ru; i++) + toSparseMCSRRowNoScan(a, b, n, i, est); + } + + private static void toSparseMCSRRangeScan(DenseBlock a, SparseBlockMCSR b, int n, int rl, int ru) { for(int i = rl; i < ru; i++) - toSparseMCSRRow(a, b, n, i); + toSparseMCSRRowScan(a, b, n, i); } - private static void toSparseMCSRRow(DenseBlock a, SparseBlockMCSR b, int n, int i) { + private static void toSparseMCSRRowScan(DenseBlock a, SparseBlockMCSR b, int n, int i) { final double[] avals = a.values(i); final int aix = a.pos(i); // compute nnz per row (not via recomputeNonZeros as sparse allocated) final int lnnz = UtilFunctions.computeNnz(avals, aix, n); if(lnnz <= 0) return; + SparseRowVector sb = toSparseMCSRRowKnownNNZ(avals, aix, n, i, lnnz); + b.set(i, sb, false); + } + private static void toSparseMCSRRowNoScan(DenseBlock a, SparseBlockMCSR b, int n, int i, int est) { + final double[] avals = a.values(i); + final int aix = a.pos(i); + final SparseRowVector sb = new SparseRowVector(est / 2); + toSparseMCSRRowUnknownNNZ(avals, aix, n, i, sb); + b.set(i, sb, false); + } + + private static SparseRowVector toSparseMCSRRowKnownNNZ(double[] avals, int aix, int n, int i, int lnnz) { // allocate sparse row and append non-zero values final double[] vals = new double[lnnz]; final int[] idx = new int[lnnz]; - for(int j = 0, o = 0; j < n; j++) { - double v = avals[aix + j]; + final double v = avals[aix + j]; if(v != 0.0) { vals[o] = v; idx[o] = j; o++; } } - b.set(i, new SparseRowVector(vals, idx), false); + return new SparseRowVector(vals, idx); + } + + private static void toSparseMCSRRowUnknownNNZ(final double[] avals, int aix, final int n, final int i, + final SparseRowVector sb) { + for(int j = 0; j < n; j++) + sb.append(j, avals[aix++]); + sb.compact(); } private static void denseToSparseParallel(MatrixBlock r, int k, boolean allowCSR) { final DenseBlock a = r.getDenseBlock(); - r.denseBlock = null; - r.sparseBlock = null; - final int m = r.rlen; - final int n = r.clen; - // remember number non zeros. - final long nnzTemp = r.getNonZeros(); - r.reset(r.getNumRows(), r.getNumColumns(), nnzTemp); - // fallback to less-memory efficient MCSR format, for efficient parallel conversion. - r.sparseBlock = new SparseBlockMCSR(r.getNumRows()); - r.sparse = true; - final SparseBlockMCSR b = (SparseBlockMCSR) r.sparseBlock; - final int blockSize = Math.max(250, m / k); - ExecutorService pool = CommonThreadPool.get(k); + final ExecutorService pool = CommonThreadPool.get(k); try { - + r.denseBlock = null; + r.sparseBlock = null; + final int m = r.rlen; + final int n = r.clen; + // remember number non zeros. + final long nnzTemp = r.getNonZeros(); + final double sp = r.getSparsity(); + r.reset(r.getNumRows(), r.getNumColumns(), nnzTemp); + // fallback to less-memory efficient MCSR format, for efficient parallel conversion. + r.sparseBlock = new SparseBlockMCSR(r.getNumRows()); + r.sparse = true; + final SparseBlockMCSR b = (SparseBlockMCSR) r.sparseBlock; + final int blockSize = Math.max(1, m / k); + final int est = Math.max(1, (int) (n * sp)); + // LOG.error(nnzTemp + " " + m + " " + n); + // LOG.error(sp); List<Future<?>> tasks = new ArrayList<>(); for(int i = 0; i < m; i += blockSize) { final int start = i; final int end = Math.min(m, i + blockSize); - tasks.add(pool.submit(() -> toSparseMCSRRange(a, b, n, start, end))); + if(sp < 0.03) + tasks.add(pool.submit(() -> toSparseMCSRRangeNoScan(a, b, n, start, end, est))); + else + tasks.add(pool.submit(() -> toSparseMCSRRangeScan(a, b, n, start, end))); } + r.nonZeros = nnzTemp; for(Future<?> t : tasks) t.get(); } @@ -181,7 +237,5 @@ public interface LibMatrixDenseToSparse { finally { pool.shutdown(); } - - r.nonZeros = nnzTemp; } -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java index 98b3eaa1bb..41dc7f2264 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixMult.java @@ -200,16 +200,22 @@ public class LibMatrixMult ret = new MatrixBlock(m1.rlen, m2.clen, ultraSparse | sparse); else ret.reset(m1.rlen, m2.clen, ultraSparse | sparse); - if(ret.isInSparseFormat() && ret.getSparseBlock() instanceof SparseBlockMCSR){ + + if(ret.isInSparseFormat() && ret.getSparseBlock() instanceof SparseBlockMCSR) { // we set the estimated number of non zeros per row to number of columns // to make the allocation of cells more aggressive. - ((SparseBlockMCSR)ret.getSparseBlock()).setNnzEstimatePerRow(m2.clen, m2.clen); + ((SparseBlockMCSR) ret.getSparseBlock()).setNnzEstimatePerRow(m2.clen, m2.clen); } + if(m1.denseBlock instanceof DenseBlockFP64DEDUP) ret.allocateDenseBlock(true, true); else ret.allocateBlock(); + if(ret.isInSparseFormat() && !( ret.getSparseBlock() instanceof SparseBlockMCSR)){ + throw new DMLRuntimeException("Matrix Multiplication Sparse output must be MCSR"); + } + // Detect if we should transpose skinny right side. boolean tm2 = !fixedRet && checkPrepMatrixMultRightInput(m1,m2); m2 = prepMatrixMultRightInput(m1, m2, tm2); @@ -294,7 +300,7 @@ public class LibMatrixMult ret.nonZeros = nnzCount; // post-processing (nnz maintained in parallel) - ret.examSparsity(); + ret.examSparsity(k); } catch(Exception ex) { throw new DMLRuntimeException(ex); @@ -574,10 +580,10 @@ public class LibMatrixMult } //post-processing - ret1.recomputeNonZeros(); + ret1.recomputeNonZeros(k); ret1.examSparsity(); if( ret2 != null ) { //optional second output - ret2.recomputeNonZeros(); + ret2.recomputeNonZeros(k); ret2.examSparsity(); } @@ -1770,68 +1776,154 @@ public class LibMatrixMult } } + /** + * Ultra sparse kernel with guaranteed sparse output. + * + * @param m1 Left side ultra sparse matrix + * @param m2 Right side Matrix Sparse or Dense + * @param ret Sparse output matrix + * @param rl Row start + * @param ru Row end + */ private static void matrixMultUltraSparseLeft(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int rl, int ru) { final int m = m1.rlen; final int n = m2.clen; - //left is ultra-sparse (IKJ) + SparseBlock a = m1.sparseBlock; - SparseBlock c = ret.sparseBlock; + SparseBlockMCSR c = (SparseBlockMCSR) ret.sparseBlock; boolean rightSparse = m2.sparse; + + if(rightSparse) + matrixMultUltraSparseSparseSparseLeft( a, m2.sparseBlock, c, m, n , rl, ru); + else + matrixMultUltraSparseDenseSparseLeftRow(a, m2.denseBlock, c, m, n, rl, ru); - for( int i=rl; i<ru; i++ ) { - if( a.isEmpty(i) ) continue; - int apos = a.pos(i); - int alen = a.size(i); - int[] aixs = a.indexes(i); - double[] avals = a.values(i); - if( alen==1 ) { - //row selection (now aggregation) with potential scaling - int aix = aixs[apos]; - int lnnz = 0; - if( rightSparse ) { //sparse right matrix (full row copy) - if( !m2.sparseBlock.isEmpty(aix) ) { - ret.rlen=m; - ret.allocateSparseRowsBlock(false); //allocation on demand - boolean ldeep = (m2.sparseBlock instanceof SparseBlockMCSR); - ret.sparseBlock.set(i, m2.sparseBlock.get(aix), ldeep); - ret.nonZeros += (lnnz = ret.sparseBlock.size(i)); - } - } - else { //dense right matrix (append all values) - lnnz = (int)m2.recomputeNonZeros(aix, aix, 0, n-1); - if( lnnz > 0 ) { - c.allocate(i, lnnz); //allocate once - double[] bvals = m2.getDenseBlock().values(aix); - for( int j=0, bix=m2.getDenseBlock().pos(aix); j<n; j++ ) - c.append(i, j, bvals[bix+j]); - ret.nonZeros += lnnz; - } + if( rl == 0 && ru == m ){ + ret.recomputeNonZeros(); + } + } + + private static void matrixMultUltraSparseDenseSparseLeftRow(SparseBlock a, DenseBlock b, SparseBlockMCSR c, int m, int n, + int rl, int ru) { + for(int i = rl; i < ru; i++) { + if(a.isEmpty(i)) + continue; + final int apos = a.pos(i); + final int alen = a.size(i); + final int[] aixs = a.indexes(i); + final double[] avals = a.values(i); + if(alen == 1) + matrixMultUltraSparseDenseSparseLeftRowOneNonZero(i, aixs[apos], avals[apos], b, c, m, n); + else + matrixMultUltraSparseDenseSparseLeftRowGeneric(i, apos, alen, aixs, avals, b, c, m, n); + } + } + + private static void matrixMultUltraSparseDenseSparseLeftRowOneNonZero(int i, int aix, double aval, DenseBlock b, + SparseBlockMCSR c, int m, int n) { + final double[] bvals = b.values(aix); + final int bix = b.pos(aix); + final int lnnz = UtilFunctions.computeNnz(bvals, bix, n); + if(lnnz == 0) + return; + else if(lnnz == 1) { + for(int j = 0; j < n; j++) { + final double bv = bvals[bix + j]; + if(bv != 0) { + SparseRowScalar s = new SparseRowScalar(j, bv * aval); + c.set(i, s, false); + break; } + } + } + else { + setSparseRowVector(lnnz, aval, bvals, bix, n, i, c); + } + } - //optional scaling if not pure selection - if( avals[apos] != 1 && lnnz > 0 ) - if(c.get(i) instanceof SparseRowScalar){ - SparseRowScalar sv = (SparseRowScalar) c.get(i); - c.set(i, new SparseRowScalar(sv.getIndex(), sv.getValue() * avals[apos]), false); - } - else - vectMultiplyInPlace(avals[apos], c.values(i), c.pos(i), c.size(i)); - + private static void setSparseRowVector(int lnnz, double aval, double[] bvals, int bix, int n, int i, SparseBlockMCSR c) { + final double[] vals = new double[lnnz]; + final int[] idx = new int[lnnz]; + for(int j = 0, o = 0; j < n; j++) { + final double bv = bvals[bix + j]; + if(bv != 0) { + vals[o] = bv * aval; + idx[o] = j; + o++; } - else { //GENERAL CASE - for( int k=apos; k<apos+alen; k++ ) { - double aval = avals[k]; - int aix = aixs[k]; - for( int j=0; j<n; j++ ) { - double cval = ret.quickGetValue(i, j); - double cvald = aval*m2.quickGetValue(aix, j); - if( cvald != 0 ) - ret.quickSetValue(i, j, cval+cvald); + } + SparseRowVector v = new SparseRowVector(vals, idx); + c.set(i, v, false); + } + + private static void matrixMultUltraSparseDenseSparseLeftRowGeneric(int i, int apos, int alen, int[] aixs, + double[] avals, DenseBlock b, SparseBlock c, int m, int n){ + for(int k = apos; k < apos + alen; k++) { + final double aval = avals[k]; + final int aix = aixs[k]; + for(int j = 0; j < n; j++) { + double cvald = aval * b.get(aix, j); + if(cvald != 0) + c.add(i, j, cvald); } } + } + + private static void matrixMultUltraSparseSparseSparseLeft(SparseBlock a, SparseBlock b, SparseBlockMCSR c, int m, + int n, int rl, int ru) { + for(int i = rl; i < ru; i++) { + if(a.isEmpty(i)) + continue; + final int apos = a.pos(i); + final int alen = a.size(i); + final int[] aixs = a.indexes(i); + final double[] avals = a.values(i); + if(alen == 1) + matrixMultUltraSparseSparseSparseLeftRowOneNonZero(i, aixs[apos], avals[apos], b, c, m, n); + else // GENERAL CASE + matrixMultUltraSparseSparseSparseLeftRowGeneric(i, apos, alen, aixs, avals, b, c, m, n); + } + } + + private static void matrixMultUltraSparseSparseSparseLeftRowOneNonZero(int i, int aix, double aval, SparseBlock b, SparseBlockMCSR c, int m, int n){ + if(!b.isEmpty(aix)) { + c.set(i, b.get(aix), true); + // optional scaling if not pure selection + if(aval != 1) { + if(c.get(i) instanceof SparseRowScalar) { + SparseRowScalar sv = (SparseRowScalar) c.get(i); + c.set(i, new SparseRowScalar(sv.getIndex(), sv.getValue() * aval), false); + } + else + vectMultiplyInPlace(aval, c.values(i), c.pos(i), c.size(i)); + } + } + } + + private static void matrixMultUltraSparseSparseSparseLeftRowGeneric(int i, int apos, int alen, int[] aixs, + double[] avals, SparseBlock b, SparseBlockMCSR c, int m, int n) { + for(int k = apos; k < apos + alen; k++) { + final double aval = avals[k]; + final int aix = aixs[k]; + final int bpos = b.pos(aix); + final int blen = b.size(aix) + bpos; + final int[] bix = b.indexes(aix); + final double[] bvals = b.values(aix); + if(!c.isAllocated(i)) + c.allocate(i, Math.max(blen - bpos, 2)); // guarantee a vector for efficiency + final SparseRowVector v = (SparseRowVector) c.get(i); + if(v.size() == n){ // If output row is dense already + final double[] vvals = v.values(); + for(int bo = bpos; bo < blen; bo++) + vvals[bix[bo]] += aval * bvals[bo]; } + else + for(int bo = bpos; bo < blen; bo++) + v.add(bix[bo], aval * bvals[bo]); + } } + private static void matrixMultUltraSparseRight(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int rl, int ru) { if(!ret.isInSparseFormat() && ret.getDenseBlock().isContiguous()) diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixSparseToDense.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixSparseToDense.java index 41b1048ad9..c9710b6c78 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixSparseToDense.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixSparseToDense.java @@ -24,162 +24,98 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.data.DenseBlock; -import org.apache.sysds.runtime.data.SparseBlockCSR; -import org.apache.sysds.runtime.data.SparseBlockMCSR; -import org.apache.sysds.runtime.data.SparseRowVector; +import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.util.CommonThreadPool; -import org.apache.sysds.runtime.util.UtilFunctions; public interface LibMatrixSparseToDense { + public static final Log LOG = LogFactory.getLog(LibMatrixSparseToDense.class.getName()); + /** - * Convert the given matrix block to a sparse allocation. + * Convert the given matrix block to a Dense allocation. * - * @param r The matrix block to modify, and return the sparse block in. - * @param allowCSR If CSR is allowed. + * @param r The matrix block to modify, to dense. + * @param k allowed. */ - public static void denseToSparse(MatrixBlock r, boolean allowCSR) { - final DenseBlock a = r.getDenseBlock(); - + public static void sparseToDense(MatrixBlock r, int k) { // set target representation, early abort on empty blocks - r.sparse = true; + final SparseBlock a = r.sparseBlock; + final int m = r.rlen; + r.sparse = false; if(a == null) return; - int k = InfrastructureAnalyzer.getLocalParallelism(); - - if(k > 1) - denseToSparseParallel(r, k, allowCSR); - else if(allowCSR && r.nonZeros <= Integer.MAX_VALUE) - denseToSparseCSR(r); - else - denseToSparseMCSR(r); - - // cleanup dense block - r.denseBlock = null; - } - - private static void denseToSparseCSR(MatrixBlock r) { - final DenseBlock a = r.getDenseBlock(); - final int m = r.rlen; - final int n = r.clen; - try { - // allocate target in memory-efficient CSR format - int lnnz = (int) r.nonZeros; - int[] rptr = new int[m + 1]; - int[] indexes = new int[lnnz]; - double[] values = new double[lnnz]; - for(int i = 0, pos = 0; i < m; i++) { - double[] avals = a.values(i); - int aix = a.pos(i); - for(int j = 0; j < n; j++) { - double aval = avals[aix + j]; - if(aval != 0) { - indexes[pos] = j; - values[pos] = aval; - pos++; - } - } - rptr[i + 1] = pos; - } - r.sparseBlock = new SparseBlockCSR(rptr, indexes, values, lnnz); - } - catch(ArrayIndexOutOfBoundsException ioobe) { - r.sparse = false; - // this means something was wrong with the sparse count. - final long nnzBefore = r.nonZeros; - final long nnzNew = r.recomputeNonZeros(); - - // try again. - if(nnzBefore != nnzNew) - denseToSparse(r, true); - else - denseToSparse(r, false); - - } - } + // allocate dense target block, but keep nnz (no need to maintain) + if(!r.allocateDenseBlock(false)) + r.denseBlock.reset(); - private static void denseToSparseMCSR(MatrixBlock r) { - final DenseBlock a = r.getDenseBlock(); + final DenseBlock c = r.denseBlock; - final int m = r.rlen; - final int n = r.clen; - // remember number non zeros. - long nnzTemp = r.getNonZeros(); - // fallback to less-memory efficient MCSR format, - // which however allows much larger sparse matrices - if(!r.allocateSparseRowsBlock()) - r.reset(); // reset if not allocated - SparseBlockMCSR sblock = (SparseBlockMCSR) r.sparseBlock; - toSparseMCSRRange(a, sblock, n, 0, m); - r.nonZeros = nnzTemp; - } + if(k > 1 && r.getNonZeros() > 1000000 && r.getNumRows() > 1) + multiThreadedToDense(a, c, m, k); + else + singleThreadedToDense(a, c, m); - private static void toSparseMCSRRange(DenseBlock a, SparseBlockMCSR b, int n, int rl, int ru) { - for(int i = rl; i < ru; i++) - toSparseMCSRRow(a, b, n, i); + // cleanup sparse block + r.sparseBlock = null; } - private static void toSparseMCSRRow(DenseBlock a, SparseBlockMCSR b, int n, int i) { - final double[] avals = a.values(i); - final int aix = a.pos(i); - // compute nnz per row (not via recomputeNonZeros as sparse allocated) - final int lnnz = UtilFunctions.computeNnz(avals, aix, n); - if(lnnz <= 0) - return; - - final double[] vals = new double[lnnz]; - final int[] idx = new int[lnnz]; - // allocate sparse row and append non-zero values - // b.allocate(i, lnnz); - - for(int j = 0, o = 0; j < n; j++) { - double v = avals[aix + j]; - if(v != 0.0) { - vals[o] = v; - idx[o] = j; - o++; - } - } - b.set(i, new SparseRowVector(vals, idx), false); + private static void singleThreadedToDense(SparseBlock a, DenseBlock c, int m) { + for(int i = 0; i < m; i++) + processRow(a, c, i); } - private static void denseToSparseParallel(MatrixBlock r, int k, boolean allowCSR) { - final DenseBlock a = r.getDenseBlock(); - - final int m = r.rlen; - final int n = r.clen; - // remember number non zeros. - final long nnzTemp = r.getNonZeros(); - // fallback to less-memory efficient MCSR format, - // which however allows much larger sparse matrices - if(!r.allocateSparseRowsBlock()) - r.reset(); // reset if not allocated - final SparseBlockMCSR b = (SparseBlockMCSR) r.sparseBlock; - final int blockSize = Math.max(250, m / k); - + private static void multiThreadedToDense(SparseBlock a, DenseBlock c, int m, int k) { ExecutorService pool = CommonThreadPool.get(k); + try { List<Future<?>> tasks = new ArrayList<>(); - for(int i = 0; i < m; i += blockSize) { + final int blkz = Math.max(1, m / k); + for(int i = 0; i < m; i += blkz) { final int start = i; - final int end = Math.min(m, i + blockSize); - tasks.add(pool.submit(() -> toSparseMCSRRange(a, b, n, start, end))); + final int end = Math.min(m, i + blkz); + tasks.add(pool.submit(() -> processRange(a, c, start, end))); } - for(Future<?> t : tasks) - t.get(); + for(Future<?> f : tasks) { + f.get(); + } } catch(Exception e) { - throw new RuntimeException(e); + throw new DMLRuntimeException("Failed parallel to dense", e); } finally { pool.shutdown(); } - r.nonZeros = nnzTemp; + } + + private static void processRange(SparseBlock a, DenseBlock c, int rl, int ru) { + for(int i = rl; i < ru; i++) + processRow(a, c, i); + } + /** + * Process row i + * + * @param a Input Sparse + * @param c Output Dense + * @param i Row to process + */ + private static void processRow(SparseBlock a, DenseBlock c, int i) { + if(!a.isEmpty(i)) { + final int apos = a.pos(i); + final int alen = a.size(i); + final int[] aix = a.indexes(i); + final double[] avals = a.values(i); + final double[] cvals = c.values(i); + final int cix = c.pos(i); + for(int j = apos; j < apos + alen; j++) + cvals[cix + aix[j]] = avals[j]; + } } } diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index 46dc10a3e4..649df50304 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -1208,8 +1208,23 @@ public class MatrixBlock extends MatrixValue implements CacheBlock<MatrixBlock>, * Allowing CSR format is default for this operation. * */ - public void examSparsity() { - examSparsity(true); + public final void examSparsity() { + examSparsity(true, 1); + } + + /** + * Evaluates if this matrix block should be in sparse format in + * memory. Depending on the current representation, the state of the + * matrix block is changed to the right representation if necessary. + * Note that this consumes for the time of execution memory for both + * representations. + * + * Allowing CSR format is default for this operation. + * + * @param k parallelization degree + */ + public final void examSparsity(int k ) { + examSparsity(true, k); } /** @@ -1221,7 +1236,21 @@ public class MatrixBlock extends MatrixValue implements CacheBlock<MatrixBlock>, * * @param allowCSR allow CSR format on dense to sparse conversion */ - public void examSparsity(boolean allowCSR) { + public final void examSparsity(boolean allowCSR) { + examSparsity(allowCSR, 1); + } + + /** + * Evaluates if this matrix block should be in sparse format in + * memory. Depending on the current representation, the state of the + * matrix block is changed to the right representation if necessary. + * Note that this consumes for the time of execution memory for both + * representations. + * + * @param allowCSR allow CSR format on dense to sparse conversion + * @param k parallelization degree + */ + public void examSparsity(boolean allowCSR, int k) { //determine target representation boolean sparseDst = evalSparseFormatInMemory(allowCSR); @@ -1233,9 +1262,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock<MatrixBlock>, //change representation if required (also done for //empty blocks in order to set representation flags) if( sparse && !sparseDst) - sparseToDense(); + sparseToDense(k); else if( !sparse && sparseDst ) - denseToSparse(allowCSR); + denseToSparse(allowCSR, k); } public static boolean evalSparseFormatInMemory(DataCharacteristics dc) { @@ -1297,48 +1326,24 @@ public class MatrixBlock extends MatrixValue implements CacheBlock<MatrixBlock>, //////// // basic block handling functions - private void denseToSparse() { - denseToSparse(true); + private final void denseToSparse() { + denseToSparse(true, 1); } - public void denseToSparse(boolean allowCSR){ - LibMatrixDenseToSparse.denseToSparse(this, allowCSR); + public final void denseToSparse(boolean allowCSR){ + denseToSparse(allowCSR, 1); } - - public void sparseToDense() { - //set target representation, early abort on empty blocks - sparse = false; - if(sparseBlock==null) - return; - - int limit=rlen*clen; - if ( limit < 0 ) { - throw new DMLRuntimeException("Unexpected error in sparseToDense().. limit < 0: " + rlen + ", " + clen + ", " + limit); - } - - //allocate dense target block, but keep nnz (no need to maintain) - if( !allocateDenseBlock(false) ) - denseBlock.reset(); - - //copy sparse to dense - SparseBlock a = sparseBlock; - DenseBlock c = getDenseBlock(); - - for( int i=0; i<rlen; i++ ) { - if( a.isEmpty(i) ) continue; - int apos = a.pos(i); - int alen = a.size(i); - int[] aix = a.indexes(i); - double[] avals = a.values(i); - double[] cvals = c.values(i); - int cix = c.pos(i); - for( int j=apos; j<apos+alen; j++ ) - if( avals[j] != 0 ) - cvals[cix+aix[j]] = avals[j]; - } - - //cleanup sparse rows - sparseBlock = null; + + public void denseToSparse(boolean allowCSR, int k){ + LibMatrixDenseToSparse.denseToSparse(this, allowCSR, k); + } + + public final void sparseToDense() { + sparseToDense(1); + } + + public void sparseToDense(int k) { + LibMatrixSparseToDense.sparseToDense(this, k); } /**