[SYSTEMML-2280] Performance ultra-sparse/empty block serialization

This patch improves the performance and memory-efficiency of ultra-spark
and empty block serialization, especially for distributed shuffle
operations. In detail, we now refrain from creating custom serialization
buffers (with 8KB mem buffers) for these tiny blocks, which helps reduce
GC overhead. Furthermore, we added a new serialization code path for
sparse blocks in COO format, which changes the serialization complexity
from O(rlen+nnz) to O(nnz).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0bd08f29
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0bd08f29
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0bd08f29

Branch: refs/heads/master
Commit: 0bd08f29da6973d48d9d31fa6cf9eb6309b8af73
Parents: 18cc576
Author: Matthias Boehm <[email protected]>
Authored: Thu Apr 26 20:45:28 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Apr 27 00:03:14 2018 -0700

----------------------------------------------------------------------
 .../spark/data/CorrMatrixBlock.java             |  8 ++--
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 41 ++++++++++++++------
 2 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
index 60ad927..574839b 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
@@ -101,7 +101,7 @@ public class CorrMatrixBlock implements Externalizable
        public void writeExternal(ObjectOutput os) 
                throws IOException
        {
-               if( os instanceof ObjectOutputStream ) {
+               if( os instanceof ObjectOutputStream && 
!_value.isEmptyBlock(false) ) {
                        //fast serialize of dense/sparse blocks
                        ObjectOutputStream oos = (ObjectOutputStream)os;
                        FastBufferedDataOutputStream fos = new 
FastBufferedDataOutputStream(oos);
@@ -117,9 +117,11 @@ public class CorrMatrixBlock implements Externalizable
        private void writeHeaderAndPayload(DataOutput dos) 
                throws IOException 
        {
-               dos.writeByte((_corr!=null)?1:0);
+               boolean writeCorr = (_corr != null
+                       && !_corr.isEmptyBlock(false));
+               dos.writeByte(writeCorr ? 1 : 0);
                _value.write(dos);
-               if( _corr!=null )
+               if( writeCorr )
                        _corr.write(dos);
        }
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 9e032b6..51084ef 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -373,7 +373,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                return reset;
        }
        
-       public void allocateAndResetSparseRowsBlock(boolean clearNNZ, 
SparseBlock.Type stype)
+       public void allocateAndResetSparseBlock(boolean clearNNZ, 
SparseBlock.Type stype)
        {
                //allocate block if non-existing or too small (guaranteed to be 
0-initialized)
                if( sparseBlock == null || sparseBlock.numRows()<rlen
@@ -1874,7 +1874,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                //and to allow efficient reset without repeated sparse row 
allocation
                
                //adjust size and ensure reuse block is in CSR format
-               allocateAndResetSparseRowsBlock(false, SparseBlock.Type.CSR);
+               allocateAndResetSparseBlock(false, SparseBlock.Type.CSR);
                
                if( clen > 1 ) { //ULTRA-SPARSE BLOCK
                        //block: read ijv-triples (ordered by row and column) 
via custom 
@@ -2026,14 +2026,26 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                if( clen > 1 ) //ULTRA-SPARSE BLOCK
                {
                        //block: write ijv-triples
-                       for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); 
r++)
-                               if( !sparseBlock.isEmpty(r) )
-                               {
+                       if( sparseBlock instanceof SparseBlockCOO ) {
+                               SparseBlockCOO sblock = 
(SparseBlockCOO)sparseBlock;
+                               int[] rix = sblock.rowIndexes();
+                               int[] cix = sblock.indexes();
+                               double[] vals = sblock.values();
+                               for(int i=0; i<sblock.size(); i++) {
+                                       //ultra-sparse block: write ijv-triples
+                                       out.writeInt(rix[i]);
+                                       out.writeInt(cix[i]);
+                                       out.writeDouble(vals[i]);
+                                       wnnz++;
+                               }
+                       }
+                       else {
+                               for(int r=0;r<Math.min(rlen, 
sparseBlock.numRows()); r++) {
+                                       if( sparseBlock.isEmpty(r) ) continue;
                                        int apos = sparseBlock.pos(r);
                                        int alen = sparseBlock.size(r);
                                        int[] aix = sparseBlock.indexes(r);
                                        double[] avals = sparseBlock.values(r);
-                                       
                                        for(int j=apos; j<apos+alen; j++) {
                                                //ultra-sparse block: write 
ijv-triples
                                                out.writeInt(r);
@@ -2041,7 +2053,8 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                                                out.writeDouble(avals[j]);
                                                wnnz++;
                                        }
-                               }       
+                               }
+                       }
                }
                else //ULTRA-SPARSE COL
                {
@@ -2209,7 +2222,8 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        public void readExternal(ObjectInput is) 
                throws IOException
        {
-               if( is instanceof ObjectInputStream )
+               if( is instanceof ObjectInputStream
+                       && !(is instanceof MatrixBlockDataInput) )
                {
                        //fast deserialize of dense/sparse blocks
                        ObjectInputStream ois = (ObjectInputStream)is;
@@ -2232,7 +2246,12 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        public void writeExternal(ObjectOutput os) 
                throws IOException
        {
-               if( os instanceof ObjectOutputStream ) {
+               //note: in case of a CorrMatrixBlock being wrapped around a 
matrix
+               //block, the object output is already a 
FastBufferedDataOutputStream;
+               //so in general we try to avoid unnecessary buffer allocations 
here.
+               
+               if( os instanceof ObjectOutputStream && !isEmptyBlock(false)
+                       && !(os instanceof MatrixBlockDataOutput) ) {
                        //fast serialize of dense/sparse blocks
                        ObjectOutputStream oos = (ObjectOutputStream)os;
                        FastBufferedDataOutputStream fos = new 
FastBufferedDataOutputStream(oos);
@@ -2828,7 +2847,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
        }
        
        @Override
-       public void incrementalAggregate(AggregateOperator aggOp, MatrixValue 
correction, MatrixValue newWithCorrection) {
+       public void incrementalAggregate(AggregateOperator aggOp, MatrixValue 
correction, MatrixValue newWithCorrection, boolean deep) {
                //assert(aggOp.correctionExists); 
                MatrixBlock cor=checkType(correction);
                MatrixBlock newWithCor=checkType(newWithCorrection);
@@ -2900,7 +2919,7 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                {
                        //e.g., ak+ kahan plus as used in sum, mapmult, mmcj 
and tsmm
                        if(aggOp.increOp.fn instanceof KahanPlus) {
-                               LibMatrixAgg.aggregateBinaryMatrix(newWithCor, 
this, cor);
+                               LibMatrixAgg.aggregateBinaryMatrix(newWithCor, 
this, cor, deep);
                        }
                        else
                        {

Reply via email to