Repository: systemml Updated Branches: refs/heads/master 667aeb2b7 -> fcfbd3d24
[SYSTEMML-1838] Performance sparse/ultra-sparse binary block read This patch makes a number of performance improvements for CP read (single- and multi-threaded) of sparse and ultra-sparse matrices in binary block format: (1) Allocate reuse block in CSR format: This avoids unnecessary reallocations in the presence of a mix of sparse and ultra-sparse blocks because ultra-sparse blocks clear any reuse block that is not in CSR. (2) Special CSR init for sparse blocks from input streams (in addition to the existing init for ultra-sparse blocks) (3) Exploit estimated number of non-zeros on sparse block append (also used during read), which changes the allocation factors from 1.1x to 2x until the estimated number of non-zeros per row is reached and hence, avoids unnecessary reallocations. (4) Improved load balance via more tasks on the final sort of sparse rows. Together these changes improved the CP read time of a 1M x 1M matrix with sparsity 0.001 (with a mix of sparse and ultra-sparse blocks) of roughly 12GB in size and 1M blocks from 41s to 32.4s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9a275acb Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9a275acb Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9a275acb Branch: refs/heads/master Commit: 9a275acb0dd3f991fad7950d1824a89e109a9b43 Parents: 667aeb2 Author: Matthias Boehm <[email protected]> Authored: Mon Aug 14 19:11:52 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Aug 15 12:48:25 2017 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/CacheDataInput.java | 10 +++-- .../apache/sysml/runtime/io/MatrixReader.java | 7 ++-- .../sysml/runtime/io/ReaderBinaryBlock.java | 32 ++++++++++----- .../runtime/io/ReaderBinaryBlockParallel.java | 15 +++---- .../sysml/runtime/matrix/data/MatrixBlock.java | 26 ++++++------- .../matrix/data/MatrixBlockDataInput.java | 5 ++- .../runtime/matrix/data/SparseBlockCSR.java | 41 ++++++++++++++++---- .../runtime/matrix/data/SparseRowVector.java | 7 ++-- .../util/FastBufferedDataInputStream.java | 29 ++++++++------ 9 files changed, 110 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java index 24df6d4..8f16ae1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java @@ -163,11 +163,11 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput } @Override - public long readSparseRows(int rlen, SparseBlock rows) + public long readSparseRows(int rlen, long nnz, SparseBlock rows) throws IOException { //counter for non-zero elements - long nnz = 0; + long gnnz = 0; //read all individual sparse rows from input for( int i=0; i<rlen; i++ ) @@ -189,10 +189,14 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput _count+=12; } - nnz += lnnz; + gnnz += lnnz; } } + //sanity check valid number of read nnz + if( gnnz != nnz ) + throw new IOException("Invalid number of read nnz: "+gnnz+" vs "+nnz); + return nnz; } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index befccfe..9c59d4e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -118,10 +118,11 @@ public abstract class MatrixReader protected static void sortSparseRowsParallel(MatrixBlock dest, long rlen, int k, ExecutorService pool) throws InterruptedException, ExecutionException { - //create sort tasks + //create sort tasks (increase number of tasks for better load balance) ArrayList<SortRowsTask> tasks = new ArrayList<SortRowsTask>(); - int blklen = (int)(Math.ceil((double)rlen/k)); - for( int i=0; i<k & i*blklen<rlen; i++ ) + int k2 = (int) Math.min(8*k, rlen); + int blklen = (int)(Math.ceil((double)rlen/k2)); + for( int i=0; i<k2 & i*blklen<rlen; i++ ) tasks.add(new SortRowsTask(dest, i*blklen, Math.min((i+1)*blklen, (int)rlen))); //execute parallel sort and check for errors http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java index 0bca17d..e0c217a 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java @@ -33,6 +33,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; @@ -89,19 +90,30 @@ public class ReaderBinaryBlock extends MatrixReader ArrayList<IndexedMatrixValue> ret = new ArrayList<IndexedMatrixValue>(); //prepare file access - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path( (_localFS ? "file:///" : "") + fname); + JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + Path path = new Path( (_localFS ? "file:///" : "") + fname); FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //check existence and non-empty file - checkValidInputFile(fs, path); + checkValidInputFile(fs, path); //core read readBinaryBlockMatrixBlocksFromHDFS(path, job, fs, ret, rlen, clen, brlen, bclen); return ret; } - + + protected static MatrixBlock getReuseBlock(int brlen, int bclen, boolean sparse) { + //note: we allocate the reuse block in CSR because this avoids unnecessary + //reallocations in the presence of a mix of sparse and ultra-sparse blocks, + //where ultra-sparse deserialization only reuses CSR blocks + MatrixBlock value = new MatrixBlock(brlen, bclen, sparse); + if( sparse ) { + value.allocateAndResetSparseRowsBlock(true, SparseBlock.Type.CSR); + value.getSparseBlock().allocate(0, 1024); + } + return value; + } /** @@ -125,13 +137,12 @@ public class ReaderBinaryBlock extends MatrixReader * @throws IOException if IOException occurs * @throws DMLRuntimeException if DMLRuntimeException occurs */ - @SuppressWarnings("deprecation") private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen ) throws IOException, DMLRuntimeException { boolean sparse = dest.isInSparseFormat(); MatrixIndexes key = new MatrixIndexes(); - MatrixBlock value = new MatrixBlock(); + MatrixBlock value = getReuseBlock(brlen, bclen, sparse); long lnnz = 0; //aggregate block nnz //set up preferred custom serialization framework for binary block format @@ -141,7 +152,8 @@ public class ReaderBinaryBlock extends MatrixReader for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files { //directly read from sequence files (individual partfiles) - SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); + SequenceFile.Reader reader = new SequenceFile + .Reader(job, SequenceFile.Reader.file(lpath)); try { @@ -195,8 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader dest.sortSparseRows(); } } - - @SuppressWarnings("deprecation") + private void readBinaryBlockMatrixBlocksFromHDFS( Path path, JobConf job, FileSystem fs, Collection<IndexedMatrixValue> dest, long rlen, long clen, int brlen, int bclen ) throws IOException { @@ -210,7 +221,8 @@ public class ReaderBinaryBlock extends MatrixReader for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files { //directly read from sequence files (individual partfiles) - SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job); + SequenceFile.Reader reader = new SequenceFile + .Reader(job, SequenceFile.Reader.file(lpath)); try { http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java index e7114d8..16260a8 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java @@ -90,7 +90,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock ExecutorService pool = Executors.newFixedThreadPool(_numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>(); for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){ - ReadFileTask t = new ReadFileTask(lpath, job, fs, dest, rlen, clen, brlen, bclen); + ReadFileTask t = new ReadFileTask(lpath, job, dest, rlen, clen, brlen, bclen); tasks.add(t); } @@ -118,17 +118,15 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock { private Path _path = null; private JobConf _job = null; - private FileSystem _fs = null; private MatrixBlock _dest = null; private long _rlen = -1; private long _clen = -1; private int _brlen = -1; private int _bclen = -1; - public ReadFileTask(Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen) + public ReadFileTask(Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen) { _path = path; - _fs = fs; _job = job; _dest = dest; _rlen = rlen; @@ -138,16 +136,16 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock } @Override - @SuppressWarnings({ "deprecation" }) public Object call() throws Exception { boolean sparse = _dest.isInSparseFormat(); MatrixIndexes key = new MatrixIndexes(); - MatrixBlock value = new MatrixBlock(); + MatrixBlock value = getReuseBlock(_brlen, _bclen, sparse); long lnnz = 0; //aggregate block nnz //directly read from sequence files (individual partfiles) - SequenceFile.Reader reader = new SequenceFile.Reader(_fs,_path,_job); + SequenceFile.Reader reader = new SequenceFile + .Reader(_job, SequenceFile.Reader.file(_path)); try { @@ -205,8 +203,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock lnnz += value.getNonZeros(); } } - finally - { + finally { IOUtilFunctions.closeSilently(reader); } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index e9039ba..145eb97 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -736,7 +736,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab int len = b.size(i); int[] ix = b.indexes(i); double[] val = b.values(i); - sparseBlock.allocate(aix, sparseBlock.size(aix)+len); + if( estimatedNNzsPerRow > 0 ) + sparseBlock.allocate(aix, Math.max(estimatedNNzsPerRow, sparseBlock.size(aix)+len), clen); + else + sparseBlock.allocate(aix, sparseBlock.size(aix)+len); for( int j=pos; j<pos+len; j++ ) sparseBlock.append(aix, coloffset+ix[j], val[j]); } @@ -1894,39 +1897,36 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab } } } - + private void readSparseBlock(DataInput in) throws IOException { - allocateSparseRowsBlock(false); - resetSparse(); //reset all sparse rows + allocateSparseRowsBlock(false); + resetSparse(); - if( in instanceof MatrixBlockDataInput ) //fast deserialize - { + if( in instanceof MatrixBlockDataInput ) { //fast deserialize MatrixBlockDataInput mbin = (MatrixBlockDataInput)in; - nonZeros = mbin.readSparseRows(rlen, sparseBlock); + nonZeros = mbin.readSparseRows(rlen, nonZeros, sparseBlock); } - else if( in instanceof DataInputBuffer && MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) - { + else if( in instanceof DataInputBuffer && MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) { //workaround because sequencefile.reader.next(key, value) does not yet support serialization framework DataInputBuffer din = (DataInputBuffer)in; FastBufferedDataInputStream mbin = null; try { mbin = new FastBufferedDataInputStream(din); - nonZeros = mbin.readSparseRows(rlen, sparseBlock); + nonZeros = mbin.readSparseRows(rlen, nonZeros, sparseBlock); } finally { IOUtilFunctions.closeSilently(mbin); } } - else //default deserialize - { + else { //default deserialize for(int r=0; r<rlen; r++) { int rnnz = in.readInt(); //row nnz if( rnnz > 0 ) { sparseBlock.reset(r, rnnz, clen); for(int j=0; j<rnnz; j++) //col index/value pairs - sparseBlock.append(r, in.readInt(), in.readDouble()); + sparseBlock.append(r, in.readInt(), in.readDouble()); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java index c673623..6fc36fb 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java @@ -50,11 +50,12 @@ public interface MatrixBlockDataInput * Reads the sparse rows array from the data input into a sparse block * and returns the number of non-zeros. * - * @param rlen ? + * @param rlen number of rows + * @param nnz number of non-zeros * @param rows sparse block * @return number of non-zeros * @throws IOException if IOExcepton occurs */ - public long readSparseRows(int rlen, SparseBlock rows) + public long readSparseRows(int rlen, long nnz, SparseBlock rows) throws IOException; } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java index a4e680d..bc817c7 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java @@ -176,13 +176,9 @@ public class SparseBlockCSR extends SparseBlock public void initUltraSparse(int nnz, DataInput in) throws IOException { - //ensure empty block before init - if( _size > 0 ) - reset(); - //allocate space if necessary if( _values.length < nnz ) - resize(nnz); + resize(newCapacity(nnz)); //read ijv triples, append and update pointers int rlast = 0; @@ -201,6 +197,37 @@ public class SparseBlockCSR extends SparseBlock } /** + * Initializes the CSR sparse block from an ordered input + * stream of sparse rows (rownnz, jv-pairs*). + * + * @param rlen number of rows + * @param nnz number of non-zeros to read + * @param in data input stream of sparse rows, ordered by i + * @throws IOException if deserialization error occurs + */ + public void initSparse(int rlen, int nnz, DataInput in) + throws IOException + { + //allocate space if necessary + if( _values.length < nnz ) + resize(newCapacity(nnz)); + + //read sparse rows, append and update pointers + _ptr[0] = 0; + for( int r=0, pos=0; r<rlen; r++ ) { + int lnnz = in.readInt(); + for( int j=0; j<lnnz; j++, pos++ ) { + _indexes[pos] = in.readInt(); + _values[pos] = in.readDouble(); + } + _ptr[r+1] = pos; + } + + //update meta data + _size = nnz; + } + + /** * Get the estimated in-memory size of the sparse block in CSR * with the given dimensions w/o accounting for overallocation. * @@ -402,14 +429,14 @@ public class SparseBlockCSR extends SparseBlock if( _size==_values.length ) resize(); insert(_size, c, v); - } + } else { //resize, shift and insert if( _size==_values.length ) resizeAndInsert(pos+len, c, v); else shiftRightAndInsert(pos+len, c, v); - } + } incrPtr(r+1); } http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java index 1c160b2..4927906 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java @@ -147,10 +147,9 @@ public final class SparseRowVector extends SparseRow implements Serializable * @return new capacity for resizing */ private int newCapacity() { - if( values.length < estimatedNzs ) - return Math.min(estimatedNzs, values.length*2); - else - return (int) Math.min(maxNzs, Math.ceil((double)(values.length)*1.1)); + return (int) ((values.length < estimatedNzs) ? + Math.min(estimatedNzs, values.length*SparseBlock.RESIZE_FACTOR1) : + Math.min(maxNzs, Math.ceil(values.length*SparseBlock.RESIZE_FACTOR2))); } @Override http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java index 932ceb1..197467d 100644 --- a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java +++ b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java @@ -27,6 +27,7 @@ import java.io.InputStream; import org.apache.sysml.runtime.matrix.data.MatrixBlockDataInput; import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlockCSR; public class FastBufferedDataInputStream extends FilterInputStream implements DataInput, MatrixBlockDataInput { @@ -34,23 +35,19 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da protected byte[] _buff; protected int _bufflen; - public FastBufferedDataInputStream( InputStream in ) - { + public FastBufferedDataInputStream( InputStream in ) { this(in, 8192); } - public FastBufferedDataInputStream( InputStream in, int size ) - { + public FastBufferedDataInputStream( InputStream in, int size ) { super(in); if (size <= 0) throw new IllegalArgumentException("Buffer size <= 0"); - _buff = new byte[ size ]; _bufflen = size; } - ///////////////////////////// // DataInput Implementation ///////////////////////////// @@ -169,8 +166,8 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da public long readDoubleArray(int len, double[] varr) throws IOException { - //if( len<=0 || len != varr.length ) - // throw new IndexOutOfBoundsException("len="+len+", varr.length="+varr.length); + if( len<=0 || len > varr.length ) + throw new IndexOutOfBoundsException("len="+len+", varr.length="+varr.length); //counter for non-zero elements long nnz = 0; @@ -198,11 +195,17 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da } @Override - public long readSparseRows(int rlen, SparseBlock rows) + public long readSparseRows(int rlen, long nnz, SparseBlock rows) throws IOException { + //check for CSR quick-path + if( rows instanceof SparseBlockCSR ) { + ((SparseBlockCSR) rows).initSparse(rlen, (int)nnz, this); + return nnz; + } + //counter for non-zero elements - long nnz = 0; + long gnnz = 0; //read all individual sparse rows from input for( int i=0; i<rlen; i++ ) @@ -241,10 +244,14 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da } } - nnz += lnnz; + gnnz += lnnz; } } + //sanity check valid number of read nnz + if( gnnz != nnz ) + throw new IOException("Invalid number of read nnz: "+gnnz+" vs "+nnz); + return nnz; }
