Repository: systemml Updated Branches: refs/heads/master d753af90d -> bba7236fb
[SYSTEMML-2147] Performance read of sparse short-wide binary matrices This patch improves the performance of reading sparse but short and wide binary matrices with the multi-threaded reader. This reader synchronized over the first row per row of blocks which leads to thread contention in case of threads > num row blocks. We now preallocate all rows in such scenarios to allow a more fine-grained synchronization without hurting performance for the common case of tall and skinny matrices. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/bba7236f Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/bba7236f Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/bba7236f Branch: refs/heads/master Commit: bba7236fb3de68f12961905544d2ceca7531fbf9 Parents: d753af9 Author: Matthias Boehm <[email protected]> Authored: Tue Feb 13 00:00:44 2018 -0800 Committer: Matthias Boehm <[email protected]> Committed: Tue Feb 13 00:00:44 2018 -0800 ---------------------------------------------------------------------- .../apache/sysml/runtime/io/MatrixReader.java | 10 ++- .../runtime/io/ReaderBinaryBlockParallel.java | 44 ++++++---- .../sysml/runtime/matrix/data/MatrixBlock.java | 84 ++++++++++---------- 3 files changed, 77 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/bba7236f/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index cfdacf9..513a741 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -81,6 +81,8 @@ public abstract class MatrixReader //determine target representation (sparse/dense) boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, estnnz); + int numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + long numBlocks = (long)Math.ceil((double)rlen / brlen); //prepare result matrix block MatrixBlock ret = new MatrixBlock((int)rlen, (int)clen, sparse, estnnz); @@ -94,8 +96,12 @@ public abstract class MatrixReader && clen >= 0 && bclen > 0 && rlen >= 0 && brlen > 0 ) { //all dims known //note: allocate w/ min 2 nnz to ensure allocated row object because //adaptive change from scalar to row could cause synchronization issues - for( int i=0; i<rlen; i+=brlen ) - sblock.allocate(i, Math.max((int)(estnnz/rlen),2), (int)clen); + if( numThreads <= numBlocks ) + for( int i=0; i<rlen; i+=brlen ) + sblock.allocate(i, Math.max((int)(estnnz/rlen),2), (int)clen); + else //allocate all rows to avoid contention + for( int i=0; i<rlen; i++ ) + sblock.allocate(i, Math.max((int)(estnnz/rlen),2), (int)clen); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/bba7236f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java index e5c53dd..15d8a70 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java @@ -66,8 +66,11 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock //check existence and non-empty file checkValidInputFile(fs, path); - //core read - readBinaryBlockMatrixFromHDFS(path, job, fs, ret, rlen, clen, brlen, bclen); + //core read + int numThreads = OptimizerUtils.getParallelBinaryReadParallelism(); + long numBlocks = (long)Math.ceil((double)rlen / brlen); + readBinaryBlockMatrixFromHDFS(path, job, fs, ret, + rlen, clen, brlen, bclen, numThreads<=numBlocks); //finally check if change of sparse/dense block representation required if( !AGGREGATE_BLOCK_NNZ ) @@ -77,7 +80,8 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock return ret; } - private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen ) + private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, + long rlen, long clen, int brlen, int bclen, boolean syncBlock ) throws IOException, DMLRuntimeException { //set up preferred custom serialization framework for binary block format @@ -90,7 +94,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock ExecutorService pool = Executors.newFixedThreadPool(_numThreads); ArrayList<ReadFileTask> tasks = new ArrayList<>(); for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){ - ReadFileTask t = new ReadFileTask(lpath, job, dest, rlen, clen, brlen, bclen); + ReadFileTask t = new ReadFileTask(lpath, job, dest, rlen, clen, brlen, bclen, syncBlock); tasks.add(t); } @@ -116,16 +120,14 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock private static class ReadFileTask implements Callable<Object> { - private Path _path = null; - private JobConf _job = null; - private MatrixBlock _dest = null; - private long _rlen = -1; - private long _clen = -1; - private int _brlen = -1; - private int _bclen = -1; + private final Path _path; + private final JobConf _job; + private final MatrixBlock _dest; + private final long _rlen, _clen; + private final int _brlen, _bclen; + private final boolean _syncBlocks; - public ReadFileTask(Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen) - { + public ReadFileTask(Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean syncBlocks) { _path = path; _job = job; _dest = dest; @@ -133,6 +135,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock _clen = clen; _brlen = brlen; _bclen = bclen; + _syncBlocks = syncBlocks; } @Override @@ -179,8 +182,16 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock //NOTE: fine-grained locking depends on MCSR SparseRow objects SparseBlock sblock = _dest.getSparseBlock(); if( sblock instanceof SparseBlockMCSR && sblock.get(row_offset) != null ) { - synchronized( sblock.get(row_offset) ){ - _dest.appendToSparse(value, row_offset, col_offset); + if( _syncBlocks ) { + synchronized( sblock.get(row_offset) ){ + _dest.appendToSparse(value, row_offset, col_offset); + } + } + else { + for( int i=0; i<rows; i++ ) + synchronized( sblock.get(row_offset+i) ) { + _dest.appendRowToSparse(sblock, value, i, row_offset, col_offset, true); + } } } else { @@ -193,8 +204,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock _dest.appendToSparse(value, row_offset, col_offset); } } - else - { + else { _dest.copy( row_offset, row_offset+rows-1, col_offset, col_offset+cols-1, value, false ); } http://git-wip-us.apache.org/repos/asf/systemml/blob/bba7236f/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index b10e153..6d38a91 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -708,58 +708,58 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab //init sparse rows if necessary allocateSparseRowsBlock(false); - if( that.sparse ) //SPARSE <- SPARSE + //append individual rows + int m2 = that.rlen; + for(int i=0; i<m2; i++) + appendRowToSparse(sparseBlock, that, i, rowoffset, coloffset, deep); + } + + public void appendRowToSparse( SparseBlock dest, MatrixBlock src, int i, int rowoffset, int coloffset, boolean deep ) { + if( src.sparse ) //SPARSE <- SPARSE { - SparseBlock a = that.sparseBlock; - SparseBlock c = sparseBlock; - for( int i=0; i<that.rlen; i++ ) - { - if( a.isEmpty(i) ) continue; - int aix = rowoffset+i; - - //single block append (avoid re-allocations) - if( !c.isAllocated(aix) && coloffset==0 ) { - //note: the deep copy flag is only relevant for MCSR due to - //shallow references of b.get(i); other block formats do not - //require a redundant copy because b.get(i) created a new row. - boolean ldeep = (deep && a instanceof SparseBlockMCSR); - c.set(aix, a.get(i), ldeep); - } - else { //general case - int pos = a.pos(i); - int len = a.size(i); - int[] ix = a.indexes(i); - double[] val = a.values(i); - if( estimatedNNzsPerRow > 0 ) - c.allocate(aix, Math.max(estimatedNNzsPerRow, c.size(aix)+len), clen); - else - c.allocate(aix, c.size(aix)+len); - for( int j=pos; j<pos+len; j++ ) - c.append(aix, coloffset+ix[j], val[j]); - } + SparseBlock a = src.sparseBlock; + if( a.isEmpty(i) ) return; + int aix = rowoffset+i; + + //single block append (avoid re-allocations) + if( !dest.isAllocated(aix) && coloffset==0 ) { + //note: the deep copy flag is only relevant for MCSR due to + //shallow references of b.get(i); other block formats do not + //require a redundant copy because b.get(i) created a new row. + boolean ldeep = (deep && a instanceof SparseBlockMCSR); + dest.set(aix, a.get(i), ldeep); + } + else { //general case + int pos = a.pos(i); + int len = a.size(i); + int[] ix = a.indexes(i); + double[] val = a.values(i); + if( estimatedNNzsPerRow > 0 ) + dest.allocate(aix, Math.max(estimatedNNzsPerRow, dest.size(aix)+len), clen); + else + dest.allocate(aix, dest.size(aix)+len); + for( int j=pos; j<pos+len; j++ ) + dest.append(aix, coloffset+ix[j], val[j]); } } else //SPARSE <- DENSE { - DenseBlock a = that.getDenseBlock(); - SparseBlock c = getSparseBlock(); - final int m2 = that.rlen; - final int n2 = that.clen; - for( int i=0; i<m2; i++ ) { - double[] avals = a.values(i); - int aix = a.pos(i); - int cix = rowoffset + i; - for( int j=0; j<n2; j++ ) { - double bval = avals[aix+j]; - if( bval != 0 ) { - c.allocate(cix, estimatedNNzsPerRow, clen); - c.append(cix, coloffset+j, bval); - } + DenseBlock a = src.getDenseBlock(); + final int n2 = src.clen; + double[] avals = a.values(i); + int aix = a.pos(i); + int cix = rowoffset + i; + for( int j=0; j<n2; j++ ) { + double bval = avals[aix+j]; + if( bval != 0 ) { + dest.allocate(cix, estimatedNNzsPerRow, clen); + dest.append(cix, coloffset+j, bval); } } } } + /** * Sorts all existing sparse rows by column indexes. */
