[SYSTEMML-2280] Performance ultra-sparse/empty block serialization This patch improves the performance and memory-efficiency of ultra-spark and empty block serialization, especially for distributed shuffle operations. In detail, we now refrain from creating custom serialization buffers (with 8KB mem buffers) for these tiny blocks, which helps reduce GC overhead. Furthermore, we added a new serialization code path for sparse blocks in COO format, which changes the serialization complexity from O(rlen+nnz) to O(nnz).
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0bd08f29 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0bd08f29 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0bd08f29 Branch: refs/heads/master Commit: 0bd08f29da6973d48d9d31fa6cf9eb6309b8af73 Parents: 18cc576 Author: Matthias Boehm <[email protected]> Authored: Thu Apr 26 20:45:28 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Apr 27 00:03:14 2018 -0700 ---------------------------------------------------------------------- .../spark/data/CorrMatrixBlock.java | 8 ++-- .../sysml/runtime/matrix/data/MatrixBlock.java | 41 ++++++++++++++------ 2 files changed, 35 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java index 60ad927..574839b 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java @@ -101,7 +101,7 @@ public class CorrMatrixBlock implements Externalizable public void writeExternal(ObjectOutput os) throws IOException { - if( os instanceof ObjectOutputStream ) { + if( os instanceof ObjectOutputStream && !_value.isEmptyBlock(false) ) { //fast serialize of dense/sparse blocks ObjectOutputStream oos = (ObjectOutputStream)os; FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos); @@ -117,9 +117,11 @@ public class CorrMatrixBlock implements Externalizable private void writeHeaderAndPayload(DataOutput dos) throws IOException { - dos.writeByte((_corr!=null)?1:0); + boolean writeCorr = (_corr != null + && !_corr.isEmptyBlock(false)); + dos.writeByte(writeCorr ? 1 : 0); _value.write(dos); - if( _corr!=null ) + if( writeCorr ) _corr.write(dos); } http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 9e032b6..51084ef 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -373,7 +373,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return reset; } - public void allocateAndResetSparseRowsBlock(boolean clearNNZ, SparseBlock.Type stype) + public void allocateAndResetSparseBlock(boolean clearNNZ, SparseBlock.Type stype) { //allocate block if non-existing or too small (guaranteed to be 0-initialized) if( sparseBlock == null || sparseBlock.numRows()<rlen @@ -1874,7 +1874,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //and to allow efficient reset without repeated sparse row allocation //adjust size and ensure reuse block is in CSR format - allocateAndResetSparseRowsBlock(false, SparseBlock.Type.CSR); + allocateAndResetSparseBlock(false, SparseBlock.Type.CSR); if( clen > 1 ) { //ULTRA-SPARSE BLOCK //block: read ijv-triples (ordered by row and column) via custom @@ -2026,14 +2026,26 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( clen > 1 ) //ULTRA-SPARSE BLOCK { //block: write ijv-triples - for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); r++) - if( !sparseBlock.isEmpty(r) ) - { + if( sparseBlock instanceof SparseBlockCOO ) { + SparseBlockCOO sblock = (SparseBlockCOO)sparseBlock; + int[] rix = sblock.rowIndexes(); + int[] cix = sblock.indexes(); + double[] vals = sblock.values(); + for(int i=0; i<sblock.size(); i++) { + //ultra-sparse block: write ijv-triples + out.writeInt(rix[i]); + out.writeInt(cix[i]); + out.writeDouble(vals[i]); + wnnz++; + } + } + else { + for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); r++) { + if( sparseBlock.isEmpty(r) ) continue; int apos = sparseBlock.pos(r); int alen = sparseBlock.size(r); int[] aix = sparseBlock.indexes(r); double[] avals = sparseBlock.values(r); - for(int j=apos; j<apos+alen; j++) { //ultra-sparse block: write ijv-triples out.writeInt(r); @@ -2041,7 +2053,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab out.writeDouble(avals[j]); wnnz++; } - } + } + } } else //ULTRA-SPARSE COL { @@ -2209,7 +2222,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab public void readExternal(ObjectInput is) throws IOException { - if( is instanceof ObjectInputStream ) + if( is instanceof ObjectInputStream + && !(is instanceof MatrixBlockDataInput) ) { //fast deserialize of dense/sparse blocks ObjectInputStream ois = (ObjectInputStream)is; @@ -2232,7 +2246,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab public void writeExternal(ObjectOutput os) throws IOException { - if( os instanceof ObjectOutputStream ) { + //note: in case of a CorrMatrixBlock being wrapped around a matrix + //block, the object output is already a FastBufferedDataOutputStream; + //so in general we try to avoid unnecessary buffer allocations here. + + if( os instanceof ObjectOutputStream && !isEmptyBlock(false) + && !(os instanceof MatrixBlockDataOutput) ) { //fast serialize of dense/sparse blocks ObjectOutputStream oos = (ObjectOutputStream)os; FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos); @@ -2828,7 +2847,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } @Override - public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) { + public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep) { //assert(aggOp.correctionExists); MatrixBlock cor=checkType(correction); MatrixBlock newWithCor=checkType(newWithCorrection); @@ -2900,7 +2919,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab { //e.g., ak+ kahan plus as used in sum, mapmult, mmcj and tsmm if(aggOp.increOp.fn instanceof KahanPlus) { - LibMatrixAgg.aggregateBinaryMatrix(newWithCor, this, cor); + LibMatrixAgg.aggregateBinaryMatrix(newWithCor, this, cor, deep); } else {
