[SYSTEMML-2046] Serialization/deserialization for large dense blocks This patch completes the handling of large dense blocks in the serialization and deserialization code paths. Furthermore, this also includes a minor extension of the dense block abstraction as well as the handling of dense blocks in block appends, and initialization.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/127cc06d Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/127cc06d Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/127cc06d Branch: refs/heads/master Commit: 127cc06d96557edb77767422f66c6446d2b7beee Parents: 53014dd Author: Matthias Boehm <[email protected]> Authored: Fri Jan 5 19:56:05 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Fri Jan 5 19:56:05 2018 -0800 ---------------------------------------------------------------------- .../sysml/runtime/matrix/data/DenseBlock.java | 11 +- .../runtime/matrix/data/DenseBlockDRB.java | 5 + .../runtime/matrix/data/DenseBlockLDRB.java | 5 + .../sysml/runtime/matrix/data/MatrixBlock.java | 164 ++++++++++--------- 4 files changed, 103 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java index e2177e2..725cd7f 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java @@ -129,8 +129,17 @@ public abstract class DenseBlock implements Serializable public abstract long countNonZeros(); /** + * Compute the number of non-zero values for the given row, + * which potentially makes a full pass over the underlying row. + * + * @param r row index + * @return number of non-zeros + */ + public abstract int countNonZeros(int r); + + /** * Compute the number of non-zero values, which potentially - * makes a full pass over the underlying blocks. + * makes a full pass over the underlying blocks in the row range. * * @param rl row lower index * @param ru row upper index (exclusive) http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java index 3cd7f52..03b7cd4 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java @@ -111,6 +111,11 @@ public class DenseBlockDRB extends DenseBlock nnz += (a[i]!=0) ? 1 : 0; return nnz; } + + @Override + public int countNonZeros(int r) { + return UtilFunctions.computeNnz(data, r*clen, clen); + } @Override public long countNonZeros(int rl, int ru, int cl, int cu) { http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java index 2011629..9a4d47d 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java @@ -137,6 +137,11 @@ public class DenseBlockLDRB extends DenseBlock } @Override + public int countNonZeros(int r) { + return UtilFunctions.computeNnz(values(r), pos(r), clen); + } + + @Override public long countNonZeros(int rl, int ru, int cl, int cu) { long nnz = 0; boolean rowBlock = (cl == 0 && cu == clen); http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index e8d3d76..4087e90 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -276,7 +276,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab */ public void init(double[][] arr, int r, int c) throws DMLRuntimeException - { + { //input checks if ( sparse ) throw new DMLRuntimeException("MatrixBlockDSM.init() can be invoked only on matrices with dense representation."); @@ -287,9 +287,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab allocateDenseBlock(); //copy and compute nnz - double[] data = getDenseBlockValues(); - for(int i=0, ix=0; i < r; i++, ix+=clen) - System.arraycopy(arr[i], 0, data, ix, arr[i].length); + DenseBlock db = getDenseBlock(); + for(int i=0; i < r; i++) + System.arraycopy(arr[i], 0, db.values(i), db.pos(i), arr[i].length); recomputeNonZeros(); } @@ -313,7 +313,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //allocate or resize dense block allocateDenseBlock(); - //copy and compute nnz + //copy and compute nnz (guaranteed single block) System.arraycopy(arr, 0, getDenseBlockValues(), 0, arr.length); recomputeNonZeros(); } @@ -716,47 +716,53 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( that.sparse ) //SPARSE <- SPARSE { - SparseBlock b = that.sparseBlock; + SparseBlock a = that.sparseBlock; + SparseBlock c = sparseBlock; for( int i=0; i<that.rlen; i++ ) { - if( b.isEmpty(i) ) continue; + if( a.isEmpty(i) ) continue; int aix = rowoffset+i; //single block append (avoid re-allocations) - if( !sparseBlock.isAllocated(aix) && coloffset==0 ) { + if( !c.isAllocated(aix) && coloffset==0 ) { //note: the deep copy flag is only relevant for MCSR due to //shallow references of b.get(i); other block formats do not //require a redundant copy because b.get(i) created a new row. - boolean ldeep = (deep && b instanceof SparseBlockMCSR); - sparseBlock.set(aix, b.get(i), ldeep); + boolean ldeep = (deep && a instanceof SparseBlockMCSR); + c.set(aix, a.get(i), ldeep); } else { //general case - int pos = b.pos(i); - int len = b.size(i); - int[] ix = b.indexes(i); - double[] val = b.values(i); + int pos = a.pos(i); + int len = a.size(i); + int[] ix = a.indexes(i); + double[] val = a.values(i); if( estimatedNNzsPerRow > 0 ) - sparseBlock.allocate(aix, Math.max(estimatedNNzsPerRow, sparseBlock.size(aix)+len), clen); + c.allocate(aix, Math.max(estimatedNNzsPerRow, c.size(aix)+len), clen); else - sparseBlock.allocate(aix, sparseBlock.size(aix)+len); + c.allocate(aix, c.size(aix)+len); for( int j=pos; j<pos+len; j++ ) - sparseBlock.append(aix, coloffset+ix[j], val[j]); + c.append(aix, coloffset+ix[j], val[j]); } } } else //SPARSE <- DENSE { - double[] b = that.getDenseBlockValues(); - final int bm = that.rlen; - final int bn = that.clen; - for( int i=0, aix=rowoffset, bix=0; i<bm; i++, aix++, bix+=bn ) - for( int j=0; j<bn; j++ ) { - final double bval = b[bix+j]; + DenseBlock a = that.getDenseBlock(); + SparseBlock c = getSparseBlock(); + final int m2 = that.rlen; + final int n2 = that.clen; + for( int i=0; i<m2; i++ ) { + double[] avals = a.values(i); + int aix = a.pos(i); + int cix = rowoffset + i; + for( int j=0; j<n2; j++ ) { + double bval = avals[aix+j]; if( bval != 0 ) { - sparseBlock.allocate(aix, estimatedNNzsPerRow, clen); - sparseBlock.append(aix, coloffset+j, bval); + c.allocate(cix, estimatedNNzsPerRow, clen); + c.append(cix, coloffset+j, bval); } } + } } } @@ -778,7 +784,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab */ public void sortSparseRows(int rl, int ru) { if( !sparse || sparseBlock==null ) - return; + return; for( int i=rl; i<ru; i++ ) if( !sparseBlock.isEmpty(i) ) sparseBlock.sort(i); @@ -1807,38 +1813,36 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } - private void readDenseBlock(DataInput in) - throws IOException, DMLRuntimeException + private void readDenseBlock(DataInput in) + throws IOException, DMLRuntimeException { if( !allocateDenseBlock(false) ) //allocate block denseBlock.reset(rlen, clen); - int limit = rlen*clen; - double[] a = getDenseBlockValues(); - if( in instanceof MatrixBlockDataInput ) //fast deserialize - { + DenseBlock a = getDenseBlock(); + long nnz = 0; + if( in instanceof MatrixBlockDataInput ) { //fast deserialize MatrixBlockDataInput mbin = (MatrixBlockDataInput)in; - nonZeros = mbin.readDoubleArray(limit, a); + for( int i=0; i<a.numBlocks(); i++ ) + nnz += mbin.readDoubleArray(a.size(i), a.valuesAt(i)); } - else if( in instanceof DataInputBuffer && MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) - { + else if( in instanceof DataInputBuffer && MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) { //workaround because sequencefile.reader.next(key, value) does not yet support serialization framework DataInputBuffer din = (DataInputBuffer)in; - FastBufferedDataInputStream mbin = null; - try { - mbin = new FastBufferedDataInputStream(din); - nonZeros = mbin.readDoubleArray(limit, a); - } - finally { - IOUtilFunctions.closeSilently(mbin); + try(FastBufferedDataInputStream mbin = new FastBufferedDataInputStream(din)) { + for( int i=0; i<a.numBlocks(); i++ ) + nnz += mbin.readDoubleArray(a.size(i), a.valuesAt(i)); } } else { //default deserialize - long nnz = 0; - for( int i=0; i<limit; i++ ) - nnz += ((a[i] = in.readDouble()) != 0) ? 1 : 0; - nonZeros = nnz; + for( int i=0; i<rlen; i++ ) { + double[] avals = a.values(i); + int aix = a.pos(i); + for( int j=0; j<clen; j++ ) + nnz += ((avals[aix+j] = in.readDouble()) != 0) ? 1 : 0; + } } + nonZeros = nnz; } private void readSparseBlock(DataInput in) @@ -1875,19 +1879,20 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } - private void readSparseToDense(DataInput in) - throws IOException, DMLRuntimeException + private void readSparseToDense(DataInput in) + throws IOException, DMLRuntimeException { if( !allocateDenseBlock(false) ) //allocate block denseBlock.reset(rlen, clen); - double[] a = getDenseBlockValues(); + DenseBlock a = getDenseBlock(); for(int r=0; r<rlen; r++) { int nr = in.readInt(); + double[] avals = a.values(r); + int cix = a.pos(r); for( int j=0; j<nr; j++ ) { int c = in.readInt(); - double val = in.readDouble(); - a[r*clen+c] = val; + avals[cix+c] = in.readDouble(); } } } @@ -1901,23 +1906,21 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //adjust size and ensure reuse block is in CSR format allocateAndResetSparseRowsBlock(false, SparseBlock.Type.CSR); - if( clen > 1 ) //ULTRA-SPARSE BLOCK - { + if( clen > 1 ) { //ULTRA-SPARSE BLOCK //block: read ijv-triples (ordered by row and column) via custom //init to avoid repeated updates of row pointers per append SparseBlockCSR sblockCSR = (SparseBlockCSR) sparseBlock; sblockCSR.initUltraSparse((int)nonZeros, in); } - else //ULTRA-SPARSE COL - { + else { //ULTRA-SPARSE COL //col: read iv-pairs (should never happen since always dense) for(long i=0; i<nonZeros; i++) { int r = in.readInt(); - double val = in.readDouble(); + double val = in.readDouble(); sparseBlock.allocate(r, 1, 1); sparseBlock.append(r, 0, val); } - } + } } private void readUltraSparseToDense(DataInput in) @@ -1926,23 +1929,20 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( !allocateDenseBlock(false) ) //allocate block denseBlock.reset(rlen, clen); - double[] a = getDenseBlockValues(); if( clen > 1 ) { //ULTRA-SPARSE BLOCK //block: read ijv-triples + DenseBlock a = getDenseBlock(); for(long i=0; i<nonZeros; i++) { int r = in.readInt(); int c = in.readInt(); - double val = in.readDouble(); - a[r*clen+c] = val; + a.set(r, c, in.readDouble()); } } else { //ULTRA-SPARSE COL //col: read iv-pairs - for(long i=0; i<nonZeros; i++) { - int r = in.readInt(); - double val = in.readDouble(); - a[r] = val; - } + double[] a = getDenseBlockValues(); + for(long i=0; i<nonZeros; i++) + a[in.readInt()] = in.readDouble(); } } @@ -2135,13 +2135,14 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab writeNnzInfo( out, true ); long wnnz = 0; - double[] a = getDenseBlockValues(); - if( clen > 1 ) //ULTRA-SPARSE BLOCK - { + if( clen > 1 ) { //ULTRA-SPARSE BLOCK //block: write ijv-triples - for(int r=0, ix=0; r<rlen; r++) - for(int c=0; c<clen; c++, ix++) { - double aval = a[ix]; + DenseBlock a = getDenseBlock(); + for( int r=0; r<rlen; r++ ) { + double[] avals = a.values(r); + int aix = a.pos(r); + for( int c=0; c<clen; c++ ) { + double aval = avals[aix+c]; if( aval != 0 ) { out.writeInt(r); out.writeInt(c); @@ -2149,10 +2150,11 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab wnnz++; } } + } } - else //ULTRA-SPARSE COL - { + else { //ULTRA-SPARSE COL //col: write iv-pairs + double[] a = getDenseBlockValues(); for(int r=0; r<rlen; r++) { double aval = a[r]; if( aval != 0 ) { @@ -2175,12 +2177,13 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab out.writeByte( BlockType.SPARSE_BLOCK.ordinal() ); //block type writeNnzInfo( out, false ); - double[] a = getDenseBlockValues(); - for(int r=0, ix=0; r<rlen; r++, ix+=clen) { - int nr = (int)denseBlock.countNonZeros(r, r+1, 0, clen); - out.writeInt(nr); - for(int c=0; c<clen; c++) { - double aval = a[ix+c]; + DenseBlock a = getDenseBlock(); + for( int r=0; r<rlen; r++ ) { + double[] avals = a.values(r); + int aix = a.pos(r); + out.writeInt(a.countNonZeros(r)); + for( int c=0; c<clen; c++ ) { + double aval = avals[aix+c]; if( aval != 0 ) { out.writeInt(c); out.writeDouble(aval); @@ -2194,7 +2197,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab { //note: if ultrasparse, int always sufficient because nnz<rlen // where rlen is limited to integer - long lrlen = (long)rlen; long lclen = (long)clen; @@ -2203,7 +2205,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab nonZeros = in.readLong(); } else { - nonZeros = in.readInt(); + nonZeros = in.readInt(); } return nonZeros;
