[SYSTEMML-2224] Remove unnecessary fields from matrix blocks This patch moves meta data information such as dig (i.e., that a matrix only has non-zeros on the diagonal) from the matrix block to matrix object to make the core block more compact. This is especially important for large distributed ultra-sparse matrices, where these fields lead to unnecessary GC overhead in case of a large fraction of empty blocks that cannot be filtered out to guarantee correctness.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/c5161456 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/c5161456 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/c5161456 Branch: refs/heads/master Commit: c5161456fe5d1a4e9cad7a2ce93dbdec291c2db1 Parents: 2b3aefe Author: Matthias Boehm <[email protected]> Authored: Sat Mar 31 16:23:46 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Mar 31 16:23:46 2018 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/MatrixObject.java | 19 +++++++-- .../instructions/cp/ReorgCPInstruction.java | 2 + .../apache/sysml/runtime/io/MatrixWriter.java | 33 +++++++------- .../sysml/runtime/io/WriterBinaryBlock.java | 4 +- .../sysml/runtime/io/WriterBinaryCell.java | 3 +- .../sysml/runtime/io/WriterMatrixMarket.java | 2 +- .../apache/sysml/runtime/io/WriterTextCSV.java | 2 +- .../apache/sysml/runtime/io/WriterTextCell.java | 2 +- .../runtime/matrix/data/LibMatrixReorg.java | 4 +- .../sysml/runtime/matrix/data/MatrixBlock.java | 45 +++++--------------- .../sysml/runtime/util/DataConverter.java | 21 ++++----- 11 files changed, 58 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index de49222..9714a19 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -71,14 +71,15 @@ public class MatrixObject extends CacheableData<MatrixBlock> //additional matrix-specific flags private UpdateType _updateType = UpdateType.COPY; - + private boolean _diag = false; + //information relevant to partitioned matrices. private boolean _partitioned = false; //indicates if obj partitioned private PDataPartitionFormat _partitionFormat = null; //indicates how obj partitioned private int _partitionSize = -1; //indicates n for BLOCKWISE_N private String _partitionCacheName = null; //name of cache block private MatrixBlock _partitionInMemory = null; - + /** * Constructor that takes the value type and the HDFS filename. * @@ -119,6 +120,7 @@ public class MatrixObject extends CacheableData<MatrixBlock> metaOld.getOutputInfo(), metaOld.getInputInfo()); _updateType = mo._updateType; + _diag = mo._diag; _partitioned = mo._partitioned; _partitionFormat = mo._partitionFormat; _partitionSize = mo._partitionSize; @@ -133,6 +135,14 @@ public class MatrixObject extends CacheableData<MatrixBlock> return _updateType; } + public boolean isDiag() { + return _diag; + } + + public void setDiag(boolean diag) { + _diag = diag; + } + @Override public void updateMatrixCharacteristics (MatrixCharacteristics mc) { _metaData.getMatrixCharacteristics().set(mc); @@ -531,10 +541,11 @@ public class MatrixObject extends CacheableData<MatrixBlock> if ( oinfo == OutputInfo.BinaryBlockOutputInfo && DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE && (mc.getRowsPerBlock() != ConfigurationManager.getBlocksize() || mc.getColsPerBlock() != ConfigurationManager.getBlocksize()) ) { - DataConverter.writeMatrixToHDFS(_data, fname, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(), ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop); + DataConverter.writeMatrixToHDFS(_data, fname, oinfo, new MatrixCharacteristics(mc.getRows(), mc.getCols(), + ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), mc.getNonZeros()), rep, fprop, _diag); } else { - DataConverter.writeMatrixToHDFS(_data, fname, oinfo, mc, rep, fprop); + DataConverter.writeMatrixToHDFS(_data, fname, oinfo, mc, rep, fprop, _diag); } if( LOG.isTraceEnabled() ) http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java index af4c12e..89d99b7 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ReorgCPInstruction.java @@ -144,5 +144,7 @@ public class ReorgCPInstruction extends UnaryCPInstruction { ec.releaseMatrixInput(_col.getName()); ec.releaseMatrixInput(input1.getName(), getExtendedOpcode()); ec.setMatrixOutput(output.getName(), soresBlock, getExtendedOpcode()); + if( r_op.fn instanceof DiagIndex && soresBlock.getNumColumns()>1 ) //diagV2M + ec.getMatrixObject(output.getName()).setDiag(true); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java index 45ba2d9..126a772 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixWriter.java @@ -21,7 +21,6 @@ package org.apache.sysml.runtime.io; import java.io.IOException; -import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.MatrixBlock; /** @@ -33,9 +32,15 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; */ public abstract class MatrixWriter { + public void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz ) + throws IOException + { + writeMatrixToHDFS(src, fname, rlen, clen, brlen, bclen, nnz, false); + } - public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz ) - throws IOException, DMLRuntimeException; + public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag ) + throws IOException; + /** * Writes a minimal entry to represent an empty matrix on hdfs. @@ -50,33 +55,28 @@ public abstract class MatrixWriter public abstract void writeEmptyMatrixToHDFS( String fname, long rlen, long clen, int brlen, int bclen ) throws IOException; - public static MatrixBlock[] createMatrixBlocksForReuse( long rlen, long clen, int brlen, int bclen, boolean sparse, long nonZeros ) - { + public static MatrixBlock[] createMatrixBlocksForReuse( long rlen, long clen, int brlen, int bclen, boolean sparse, long nonZeros ) { MatrixBlock[] blocks = new MatrixBlock[4]; double sparsity = ((double)nonZeros)/(rlen*clen); long estNNZ = -1; //full block - if( rlen >= brlen && clen >= bclen ) - { + if( rlen >= brlen && clen >= bclen ) { estNNZ = (long) (brlen*bclen*sparsity); blocks[0] = new MatrixBlock( brlen, bclen, sparse, (int)estNNZ ); } //partial col block - if( rlen >= brlen && clen%bclen!=0 ) - { + if( rlen >= brlen && clen%bclen!=0 ) { estNNZ = (long) (brlen*(clen%bclen)*sparsity); blocks[1] = new MatrixBlock( brlen, (int)(clen%bclen), sparse, (int)estNNZ ); } //partial row block - if( rlen%brlen!=0 && clen>=bclen ) - { + if( rlen%brlen!=0 && clen>=bclen ) { estNNZ = (long) ((rlen%brlen)*bclen*sparsity); blocks[2] = new MatrixBlock( (int)(rlen%brlen), bclen, sparse, (int)estNNZ ); } //partial row/col block - if( rlen%brlen!=0 && clen%bclen!=0 ) - { + if( rlen%brlen!=0 && clen%bclen!=0 ) { estNNZ = (long) ((rlen%brlen)*(clen%bclen)*sparsity); blocks[3] = new MatrixBlock( (int)(rlen%brlen), (int)(clen%bclen), sparse, (int)estNNZ ); } @@ -85,16 +85,14 @@ public abstract class MatrixWriter for( MatrixBlock b : blocks ) if( b != null ) if( !sparse ) - b.allocateDenseBlockUnsafe(b.getNumRows(), b.getNumColumns()); + b.allocateDenseBlockUnsafe(b.getNumRows(), b.getNumColumns()); //NOTE: no preallocation for sparse (preallocate sparserows with estnnz) in order to reduce memory footprint return blocks; } - public static MatrixBlock getMatrixBlockForReuse( MatrixBlock[] blocks, int rows, int cols, int brlen, int bclen ) - { + public static MatrixBlock getMatrixBlockForReuse( MatrixBlock[] blocks, int rows, int cols, int brlen, int bclen ) { int index = -1; - if( rows==brlen && cols==bclen ) index = 0; else if( rows==brlen && cols<bclen ) @@ -103,7 +101,6 @@ public abstract class MatrixWriter index = 2; else //if( rows<brlen && cols<bclen ) index = 3; - return blocks[ index ]; } } http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java index 59fe573..777828c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlock.java @@ -45,7 +45,7 @@ public class WriterBinaryBlock extends MatrixWriter } @Override - public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) throws IOException, DMLRuntimeException { //prepare file access @@ -61,7 +61,7 @@ public class WriterBinaryBlock extends MatrixWriter MRJobConfiguration.addBinaryBlockSerializationFramework( job ); //core write sequential/parallel - if( src.isDiag() ) + if( diag ) writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen); else writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, brlen, bclen); http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java index a072a6b..b4fe69f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryCell.java @@ -36,9 +36,8 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class WriterBinaryCell extends MatrixWriter { - @Override - public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) throws IOException, DMLRuntimeException { //prepare file access http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java index c9f42bf..2115a19 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarket.java @@ -42,7 +42,7 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class WriterMatrixMarket extends MatrixWriter { @Override - public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) throws IOException, DMLRuntimeException { //validity check matrix dimensions http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java index a3015f2..3e89a52 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSV.java @@ -54,7 +54,7 @@ public class WriterTextCSV extends MatrixWriter } @Override - public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) throws IOException, DMLRuntimeException { //validity check matrix dimensions http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java index 0438f46..413a165 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCell.java @@ -37,7 +37,7 @@ import org.apache.sysml.runtime.util.MapReduceTool; public class WriterTextCell extends MatrixWriter { @Override - public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz) + public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int brlen, int bclen, long nnz, boolean diag) throws IOException, DMLRuntimeException { //validity check matrix dimensions http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java index 8f7bda9..42c56c4 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java @@ -290,10 +290,8 @@ public class LibMatrixReorg int rlen = in.rlen; int clen = in.clen; - if( clen == 1 ){ //diagV2M + if( clen == 1 ) //diagV2M diagV2M( in, out ); - out.setDiag(); - } else if ( rlen == clen ) //diagM2V diagM2V( in, out ); else http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 9807340..f19fbe5 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -131,13 +131,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //sparse-block-specific attributes (allocation only) protected int estimatedNNzsPerRow = -1; - //grpaggregate-specific attributes (optional) - protected int numGroups = -1; - - //diag-specific attributes (optional) - protected boolean diag = false; - - //////// // Matrix Constructors // @@ -253,16 +246,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab (int)Math.ceil((double)estnnz/(double)rlen); //reset sparse/dense blocks - if( sparse ) { + if( sparse ) resetSparse(); - } - else { + else resetDense(val); - } - - //reset operation-specific attributes - numGroups = -1; - diag = false; } private void resetSparse() { @@ -505,15 +492,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } return ret; } - - public void setDiag() { - diag = true; - } - - public boolean isDiag() { - return diag; - } - + //////// // Data handling @@ -4723,35 +4702,31 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( this.getNumRows() != target.getNumRows() && this.getNumRows() !=Math.max(target.getNumRows(),target.getNumColumns()) || (weights != null && this.getNumRows() != weights.getNumRows()) ) throw new DMLRuntimeException("groupedAggregate can only operate on matrices with equal dimensions."); - // obtain numGroups from instruction, if provided - if (ngroups > 0) - numGroups = ngroups; - // Determine the number of groups - if( numGroups <= 0 ) { //reuse if available + if( ngroups <= 0 ) { //reuse if available double min = this.min(); double max = this.max(); if ( min <= 0 ) throw new DMLRuntimeException("Invalid value (" + min + ") encountered in 'groups' while computing groupedAggregate"); if ( max <= 0 ) throw new DMLRuntimeException("Invalid value (" + max + ") encountered in 'groups' while computing groupedAggregate."); - numGroups = (int) max; + ngroups = (int) max; } // Allocate result matrix boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1); MatrixBlock result = checkType(ret); - boolean result_sparsity = estimateSparsityOnGroupedAgg(rlen, numGroups); + boolean result_sparsity = estimateSparsityOnGroupedAgg(rlen, ngroups); if(result==null) - result=new MatrixBlock(numGroups, rowVector?1:target.getNumColumns(), result_sparsity); + result=new MatrixBlock(ngroups, rowVector?1:target.getNumColumns(), result_sparsity); else - result.reset(numGroups, rowVector?1:target.getNumColumns(), result_sparsity); + result.reset(ngroups, rowVector?1:target.getNumColumns(), result_sparsity); //execute grouped aggregate operation if( k > 1 ) - LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op, k); + LibMatrixAgg.groupedAggregate(this, target, weights, result, ngroups, op, k); else - LibMatrixAgg.groupedAggregate(this, target, weights, result, numGroups, op); + LibMatrixAgg.groupedAggregate(this, target, weights, result, ngroups, op); return result; } http://git-wip-us.apache.org/repos/asf/systemml/blob/c5161456/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index 82eee9d..8748de2 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -67,22 +67,19 @@ public class DataConverter /////// public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc ) - throws IOException - { + throws IOException { writeMatrixToHDFS(mat, dir, outputinfo, mc, -1, null); } public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc, int replication, FileFormatProperties formatProperties) - throws IOException - { - try { - MatrixWriter writer = MatrixWriterFactory.createMatrixWriter( outputinfo, replication, formatProperties ); - writer.writeMatrixToHDFS(mat, dir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros()); - } - catch(Exception e) - { - throw new IOException(e); - } + throws IOException { + writeMatrixToHDFS(mat, dir, outputinfo, mc, -1, null, false); + } + + public static void writeMatrixToHDFS(MatrixBlock mat, String dir, OutputInfo outputinfo, MatrixCharacteristics mc, int replication, FileFormatProperties formatProperties, boolean diag) + throws IOException { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter( outputinfo, replication, formatProperties ); + writer.writeMatrixToHDFS(mat, dir, mc.getRows(), mc.getCols(), mc.getRowsPerBlock(), mc.getColsPerBlock(), mc.getNonZeros(), diag); } public static MatrixBlock readMatrixFromHDFS(String dir, InputInfo inputinfo, long rlen, long clen, int brlen, int bclen, boolean localFS)
