[SYSTEMML-2046] Serialization/deserialization for large dense blocks

This patch completes the handling of large dense blocks in the
serialization and deserialization code paths. Furthermore, this also
includes a minor extension of the dense block abstraction as well as the
handling of dense blocks in block appends, and initialization.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/127cc06d
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/127cc06d
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/127cc06d

Branch: refs/heads/master
Commit: 127cc06d96557edb77767422f66c6446d2b7beee
Parents: 53014dd
Author: Matthias Boehm <[email protected]>
Authored: Fri Jan 5 19:56:05 2018 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jan 5 19:56:05 2018 -0800

----------------------------------------------------------------------
 .../sysml/runtime/matrix/data/DenseBlock.java   |  11 +-
 .../runtime/matrix/data/DenseBlockDRB.java      |   5 +
 .../runtime/matrix/data/DenseBlockLDRB.java     |   5 +
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 164 ++++++++++---------
 4 files changed, 103 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java
index e2177e2..725cd7f 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlock.java
@@ -129,8 +129,17 @@ public abstract class DenseBlock implements Serializable
        public abstract long countNonZeros();
        
        /**
+        * Compute the number of non-zero values for the given row,
+        * which potentially makes a full pass over the underlying row.
+        * 
+        * @param r row index
+        * @return number of non-zeros
+        */
+       public abstract int countNonZeros(int r);
+       
+       /**
         * Compute the number of non-zero values, which potentially 
-        * makes a full pass over the underlying blocks.
+        * makes a full pass over the underlying blocks in the row range.
         * 
         * @param rl row lower index
         * @param ru row upper index (exclusive)

http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java
index 3cd7f52..03b7cd4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockDRB.java
@@ -111,6 +111,11 @@ public class DenseBlockDRB extends DenseBlock
                        nnz += (a[i]!=0) ? 1 : 0;
                return nnz;
        }
+       
+       @Override
+       public int countNonZeros(int r) {
+               return UtilFunctions.computeNnz(data, r*clen, clen);
+       }
 
        @Override
        public long countNonZeros(int rl, int ru, int cl, int cu) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java
index 2011629..9a4d47d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/DenseBlockLDRB.java
@@ -137,6 +137,11 @@ public class DenseBlockLDRB extends DenseBlock
        }
        
        @Override
+       public int countNonZeros(int r) {
+               return UtilFunctions.computeNnz(values(r), pos(r), clen);
+       }
+       
+       @Override
        public long countNonZeros(int rl, int ru, int cl, int cu) {
                long nnz = 0;
                boolean rowBlock = (cl == 0 && cu == clen);

http://git-wip-us.apache.org/repos/asf/systemml/blob/127cc06d/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index e8d3d76..4087e90 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -276,7 +276,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         */
        public void init(double[][] arr, int r, int c) 
                throws DMLRuntimeException 
-       {       
+       {
                //input checks 
                if ( sparse )
                        throw new DMLRuntimeException("MatrixBlockDSM.init() 
can be invoked only on matrices with dense representation.");
@@ -287,9 +287,9 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                allocateDenseBlock();
                
                //copy and compute nnz
-               double[] data = getDenseBlockValues();
-               for(int i=0, ix=0; i < r; i++, ix+=clen) 
-                       System.arraycopy(arr[i], 0, data, ix, arr[i].length);
+               DenseBlock db = getDenseBlock();
+               for(int i=0; i < r; i++)
+                       System.arraycopy(arr[i], 0, db.values(i), db.pos(i), 
arr[i].length);
                recomputeNonZeros();
        }
        
@@ -313,7 +313,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                //allocate or resize dense block
                allocateDenseBlock();
                
-               //copy and compute nnz 
+               //copy and compute nnz (guaranteed single block)
                System.arraycopy(arr, 0, getDenseBlockValues(), 0, arr.length);
                recomputeNonZeros();
        }
@@ -716,47 +716,53 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                
                if( that.sparse ) //SPARSE <- SPARSE
                {
-                       SparseBlock b = that.sparseBlock;
+                       SparseBlock a = that.sparseBlock;
+                       SparseBlock c = sparseBlock;
                        for( int i=0; i<that.rlen; i++ )
                        {
-                               if( b.isEmpty(i) ) continue;
+                               if( a.isEmpty(i) ) continue;
                                int aix = rowoffset+i;
                                
                                //single block append (avoid re-allocations)
-                               if( !sparseBlock.isAllocated(aix) && 
coloffset==0 ) { 
+                               if( !c.isAllocated(aix) && coloffset==0 ) { 
                                        //note: the deep copy flag is only 
relevant for MCSR due to
                                        //shallow references of b.get(i); other 
block formats do not
                                        //require a redundant copy because 
b.get(i) created a new row.
-                                       boolean ldeep = (deep && b instanceof 
SparseBlockMCSR);
-                                       sparseBlock.set(aix, b.get(i), ldeep);
+                                       boolean ldeep = (deep && a instanceof 
SparseBlockMCSR);
+                                       c.set(aix, a.get(i), ldeep);
                                }
                                else { //general case
-                                       int pos = b.pos(i);
-                                       int len = b.size(i);
-                                       int[] ix = b.indexes(i);
-                                       double[] val = b.values(i);
+                                       int pos = a.pos(i);
+                                       int len = a.size(i);
+                                       int[] ix = a.indexes(i);
+                                       double[] val = a.values(i);
                                        if( estimatedNNzsPerRow > 0 )
-                                               sparseBlock.allocate(aix, 
Math.max(estimatedNNzsPerRow, sparseBlock.size(aix)+len), clen);
+                                               c.allocate(aix, 
Math.max(estimatedNNzsPerRow, c.size(aix)+len), clen);
                                        else
-                                               sparseBlock.allocate(aix, 
sparseBlock.size(aix)+len);
+                                               c.allocate(aix, 
c.size(aix)+len);
                                        for( int j=pos; j<pos+len; j++ )
-                                               sparseBlock.append(aix, 
coloffset+ix[j], val[j]);
+                                               c.append(aix, coloffset+ix[j], 
val[j]);
                                }
                        }
                }
                else //SPARSE <- DENSE
                {
-                       double[] b = that.getDenseBlockValues();
-                       final int bm = that.rlen;
-                       final int bn = that.clen;
-                       for( int i=0, aix=rowoffset, bix=0; i<bm; i++, aix++, 
bix+=bn )
-                               for( int j=0; j<bn; j++ ) {
-                                       final double bval = b[bix+j];
+                       DenseBlock a = that.getDenseBlock();
+                       SparseBlock c = getSparseBlock();
+                       final int m2 = that.rlen;
+                       final int n2 = that.clen;
+                       for( int i=0; i<m2; i++ ) {
+                               double[] avals = a.values(i);
+                               int aix = a.pos(i);
+                               int cix = rowoffset + i;
+                               for( int j=0; j<n2; j++ ) {
+                                       double bval = avals[aix+j];
                                        if( bval != 0 ) {
-                                               sparseBlock.allocate(aix, 
estimatedNNzsPerRow, clen);
-                                               sparseBlock.append(aix, 
coloffset+j, bval);
+                                               c.allocate(cix, 
estimatedNNzsPerRow, clen);
+                                               c.append(cix, coloffset+j, 
bval);
                                        }
                                }
+                       }
                }
        }
        
@@ -778,7 +784,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
         */
        public void sortSparseRows(int rl, int ru) {
                if( !sparse || sparseBlock==null )
-                       return;         
+                       return;
                for( int i=rl; i<ru; i++ )
                        if( !sparseBlock.isEmpty(i) )
                                sparseBlock.sort(i);
@@ -1807,38 +1813,36 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
        }
 
-       private void readDenseBlock(DataInput in) 
-               throws IOException, DMLRuntimeException 
+       private void readDenseBlock(DataInput in)
+               throws IOException, DMLRuntimeException
        {
                if( !allocateDenseBlock(false) ) //allocate block
                        denseBlock.reset(rlen, clen);
                
-               int limit = rlen*clen;
-               double[] a = getDenseBlockValues();
-               if( in instanceof MatrixBlockDataInput ) //fast deserialize
-               {
+               DenseBlock a = getDenseBlock();
+               long nnz = 0;
+               if( in instanceof MatrixBlockDataInput ) { //fast deserialize
                        MatrixBlockDataInput mbin = (MatrixBlockDataInput)in;
-                       nonZeros = mbin.readDoubleArray(limit, a);
+                       for( int i=0; i<a.numBlocks(); i++ )
+                               nnz += mbin.readDoubleArray(a.size(i), 
a.valuesAt(i));
                }
-               else if( in instanceof DataInputBuffer && 
MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) 
-               {
+               else if( in instanceof DataInputBuffer && 
MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) {
                        //workaround because sequencefile.reader.next(key, 
value) does not yet support serialization framework
                        DataInputBuffer din = (DataInputBuffer)in;
-                       FastBufferedDataInputStream mbin = null;
-                       try {
-                               mbin = new FastBufferedDataInputStream(din);
-                               nonZeros = mbin.readDoubleArray(limit, a);
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(mbin);
+                       try(FastBufferedDataInputStream mbin = new 
FastBufferedDataInputStream(din)) {
+                               for( int i=0; i<a.numBlocks(); i++ )
+                                       nnz += mbin.readDoubleArray(a.size(i), 
a.valuesAt(i));
                        }
                }
                else { //default deserialize
-                       long nnz = 0;
-                       for( int i=0; i<limit; i++ )
-                               nnz += ((a[i] = in.readDouble()) != 0) ? 1 : 0;
-                       nonZeros = nnz;
+                       for( int i=0; i<rlen; i++ ) {
+                               double[] avals = a.values(i);
+                               int aix = a.pos(i);
+                               for( int j=0; j<clen; j++ )
+                                       nnz += ((avals[aix+j] = 
in.readDouble()) != 0) ? 1 : 0;
+                       }
                }
+               nonZeros = nnz;
        }
        
        private void readSparseBlock(DataInput in) 
@@ -1875,19 +1879,20 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                }
        }
 
-       private void readSparseToDense(DataInput in) 
-               throws IOException, DMLRuntimeException 
+       private void readSparseToDense(DataInput in)
+               throws IOException, DMLRuntimeException
        {
                if( !allocateDenseBlock(false) ) //allocate block
                        denseBlock.reset(rlen, clen);
                
-               double[] a = getDenseBlockValues();
+               DenseBlock a = getDenseBlock();
                for(int r=0; r<rlen; r++) {
                        int nr = in.readInt();
+                       double[] avals = a.values(r);
+                       int cix = a.pos(r);
                        for( int j=0; j<nr; j++ ) {
                                int c = in.readInt();
-                               double val = in.readDouble();
-                               a[r*clen+c] = val;
+                               avals[cix+c] = in.readDouble();
                        }
                }
        }
@@ -1901,23 +1906,21 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                //adjust size and ensure reuse block is in CSR format
                allocateAndResetSparseRowsBlock(false, SparseBlock.Type.CSR);
                
-               if( clen > 1 ) //ULTRA-SPARSE BLOCK
-               { 
+               if( clen > 1 ) { //ULTRA-SPARSE BLOCK
                        //block: read ijv-triples (ordered by row and column) 
via custom 
                        //init to avoid repeated updates of row pointers per 
append
                        SparseBlockCSR sblockCSR = (SparseBlockCSR) sparseBlock;
                        sblockCSR.initUltraSparse((int)nonZeros, in);
                }
-               else //ULTRA-SPARSE COL
-               {
+               else { //ULTRA-SPARSE COL
                        //col: read iv-pairs (should never happen since always 
dense)
                        for(long i=0; i<nonZeros; i++) {
                                int r = in.readInt();
-                               double val = in.readDouble();           
+                               double val = in.readDouble();
                                sparseBlock.allocate(r, 1, 1);
                                sparseBlock.append(r, 0, val);
                        }
-               }       
+               }
        }
 
        private void readUltraSparseToDense(DataInput in) 
@@ -1926,23 +1929,20 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                if( !allocateDenseBlock(false) ) //allocate block
                        denseBlock.reset(rlen, clen);
                
-               double[] a = getDenseBlockValues();
                if( clen > 1 ) { //ULTRA-SPARSE BLOCK
                        //block: read ijv-triples
+                       DenseBlock a = getDenseBlock();
                        for(long i=0; i<nonZeros; i++) {
                                int r = in.readInt();
                                int c = in.readInt();
-                               double val = in.readDouble();
-                               a[r*clen+c] = val;
+                               a.set(r, c, in.readDouble());
                        }
                }
                else { //ULTRA-SPARSE COL
                        //col: read iv-pairs
-                       for(long i=0; i<nonZeros; i++) {
-                               int r = in.readInt();
-                               double val = in.readDouble();
-                               a[r] = val;
-                       }
+                       double[] a = getDenseBlockValues();
+                       for(long i=0; i<nonZeros; i++)
+                               a[in.readInt()] = in.readDouble();
                }
        }
        
@@ -2135,13 +2135,14 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                writeNnzInfo( out, true );
 
                long wnnz = 0;
-               double[] a = getDenseBlockValues();
-               if( clen > 1 ) //ULTRA-SPARSE BLOCK
-               {
+               if( clen > 1 ) { //ULTRA-SPARSE BLOCK
                        //block: write ijv-triples
-                       for(int r=0, ix=0; r<rlen; r++)
-                               for(int c=0; c<clen; c++, ix++) {
-                                       double aval = a[ix];
+                       DenseBlock a = getDenseBlock();
+                       for( int r=0; r<rlen; r++ ) {
+                               double[] avals = a.values(r);
+                               int aix = a.pos(r);
+                               for( int c=0; c<clen; c++ ) {
+                                       double aval = avals[aix+c];
                                        if( aval != 0 ) {
                                                out.writeInt(r);
                                                out.writeInt(c);
@@ -2149,10 +2150,11 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                                wnnz++;
                                        }
                                }
+                       }
                }
-               else //ULTRA-SPARSE COL
-               {
+               else { //ULTRA-SPARSE COL
                        //col: write iv-pairs
+                       double[] a = getDenseBlockValues();
                        for(int r=0; r<rlen; r++) {
                                double aval = a[r];
                                if( aval != 0 ) {
@@ -2175,12 +2177,13 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                out.writeByte( BlockType.SPARSE_BLOCK.ordinal() ); //block type
                writeNnzInfo( out, false );
                
-               double[] a = getDenseBlockValues();
-               for(int r=0, ix=0; r<rlen; r++, ix+=clen) {
-                       int nr = (int)denseBlock.countNonZeros(r, r+1, 0, clen);
-                       out.writeInt(nr);
-                       for(int c=0; c<clen; c++) {
-                               double aval = a[ix+c];
+               DenseBlock a = getDenseBlock();
+               for( int r=0; r<rlen; r++ ) {
+                       double[] avals = a.values(r);
+                       int aix = a.pos(r);
+                       out.writeInt(a.countNonZeros(r));
+                       for( int c=0; c<clen; c++ ) {
+                               double aval = avals[aix+c];
                                if( aval != 0 ) {
                                        out.writeInt(c);
                                        out.writeDouble(aval);
@@ -2194,7 +2197,6 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        {
                //note: if ultrasparse, int always sufficient because nnz<rlen
                // where rlen is limited to integer
-                               
                long lrlen = (long)rlen;
                long lclen = (long)clen;
                
@@ -2203,7 +2205,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        nonZeros = in.readLong();
                }
                else {
-                       nonZeros = in.readInt(); 
+                       nonZeros = in.readInt();
                }
                
                return nonZeros;

Reply via email to