[SYSTEMML-552] Performance parallel reader sparse binary block matrices Reading a sparse binary block matrix into CP is realized via append to avoid repeated reshifting for maintaining the sorted order. In case of matrices with multiple column blocks, a parallel read requires locking during append and final sorting of sparse rows. This patch improves performance by (1) sorting sparse rows in parallel, and (2) fine-grained locking (synchronization) per row block instead locking the entire matrix. On a scenario with a 100k x 100k, sp=0.01 matrix (with k=16 vcores) this led to an improvement from 13.1s to 8.9s (w/ parallel sorting) and to 4.1s (w/ both features).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/41ca1d16 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/41ca1d16 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/41ca1d16 Branch: refs/heads/master Commit: 41ca1d163c335156afaf491df21435953d04c1d4 Parents: 0089c3d Author: Matthias Boehm <[email protected]> Authored: Mon Mar 7 12:22:23 2016 -0800 Committer: Matthias Boehm <[email protected]> Committed: Mon Mar 7 12:23:29 2016 -0800 ---------------------------------------------------------------------- .../apache/sysml/runtime/io/MatrixReader.java | 15 ++++- .../sysml/runtime/io/ReaderBinaryBlock.java | 2 +- .../runtime/io/ReaderBinaryBlockParallel.java | 64 ++++++++++++++++---- .../sysml/runtime/io/ReaderBinaryCell.java | 2 +- .../apache/sysml/runtime/io/ReaderTextCSV.java | 2 +- .../sysml/runtime/io/ReaderTextCSVParallel.java | 2 +- .../apache/sysml/runtime/io/ReaderTextCell.java | 4 +- .../runtime/io/ReaderTextCellParallel.java | 2 +- 8 files changed, 71 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index ccd0aee..d8a5c2d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -26,10 +26,11 @@ import java.util.LinkedList; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR; import org.apache.sysml.runtime.util.MapReduceTool; /** @@ -98,7 +99,7 @@ public abstract class MatrixReader * @throws DMLRuntimeException * @throws IOException */ - protected static MatrixBlock createOutputMatrixBlock( long rlen, long clen, long estnnz, boolean mallocDense, boolean mallocSparse ) + protected static MatrixBlock createOutputMatrixBlock( long rlen, long clen, int bclen, int brlen, long estnnz, boolean mallocDense, boolean mallocSparse ) throws IOException, DMLRuntimeException { //check input dimension @@ -112,8 +113,16 @@ public abstract class MatrixReader MatrixBlock ret = new MatrixBlock((int)rlen, (int)clen, sparse, estnnz); if( !sparse && mallocDense ) ret.allocateDenseBlockUnsafe((int)rlen, (int)clen); - else if( sparse && mallocSparse ) + else if( sparse && mallocSparse ) { ret.allocateSparseRowsBlock(); + SparseBlock sblock = ret.getSparseBlock(); + //create synchronization points for MCSR (start row per block row) + if( sblock instanceof SparseBlockMCSR && clen > bclen //multiple col blocks + && clen > 0 && bclen > 0 && rlen > 0 && brlen > 0 ) { //all dims known + for( int i=0; i<rlen; i+=brlen ) + ret.getSparseBlock().allocate(i, Math.min((int)(estnnz/rlen),1), (int)clen); + } + } return ret; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java index 6493abb..03e42f4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java @@ -53,7 +53,7 @@ public class ReaderBinaryBlock extends MatrixReader throws IOException, DMLRuntimeException { //allocate output matrix block - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, false, false); + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, false, false); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java index 03f17ac..cdd7b33 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java @@ -31,12 +31,13 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapred.JobConf; - import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; +import org.apache.sysml.runtime.matrix.data.SparseBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR; import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration; @@ -55,7 +56,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock throws IOException, DMLRuntimeException { //allocate output matrix block (incl block allocation for parallel) - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, true); + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, true, true); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); @@ -112,7 +113,6 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock //wait until all tasks have been executed List<Future<Object>> rt = pool.invokeAll(tasks); - pool.shutdown(); //check for exceptions and aggregate nnz long lnnz = 0; @@ -121,10 +121,16 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock //post-processing dest.setNonZeros( lnnz ); - if( dest.isInSparseFormat() && clen>bclen ){ - //no need to sort if 1 column block since always sorted - dest.sortSparseRows(); - } + if( dest.isInSparseFormat() && clen>bclen ) { + //need to sort if multiple column block; otherwise always sorted + ArrayList<SortRowsTask> tasks2 = new ArrayList<SortRowsTask>(); + int blklen = (int)(Math.ceil((double)rlen/_numThreads)); + for( int i=0; i<_numThreads & i*blklen<rlen; i++ ) + tasks2.add(new SortRowsTask(dest, i*blklen, Math.min((i+1)*blklen, (int)rlen))); + pool.invokeAll(tasks2); + } + + pool.shutdown(); } catch (Exception e) { throw new IOException("Failed parallel read of binary block input.", e); @@ -196,18 +202,29 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock { //note: append requires final sort if (cols < _clen ) { - synchronized( _dest ){ //sparse requires lock, when matrix is wider than one block - _dest.appendToSparse(value, row_offset, col_offset); + //sparse requires lock, when matrix is wider than one block + //(fine-grained locking of block rows instead of the entire matrix) + //NOTE: fine-grained locking depends on MCSR SparseRow objects + SparseBlock sblock = _dest.getSparseBlock(); + if( sblock instanceof SparseBlockMCSR && sblock.get(row_offset) != null ) { + synchronized( sblock.get(row_offset) ){ + _dest.appendToSparse(value, row_offset, col_offset); + } + } + else { + synchronized( _dest ){ + _dest.appendToSparse(value, row_offset, col_offset); + } } } - else + else { //quickpath (no synchronization) _dest.appendToSparse(value, row_offset, col_offset); + } } else { _dest.copy( row_offset, row_offset+rows-1, - col_offset, col_offset+cols-1, - value, false ); + col_offset, col_offset+cols-1, value, false ); } //aggregate nnz @@ -222,4 +239,27 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock return lnnz; } } + + /** + * + */ + private static class SortRowsTask implements Callable<Object> + { + private MatrixBlock _dest = null; + private int _rl = -1; + private int _ru = -1; + + public SortRowsTask(MatrixBlock dest, int rl, int ru) { + _dest = dest; + _rl = rl; + _ru = ru; + } + + @Override + public Object call() throws Exception { + for( int i=_rl; i<_ru; i++ ) + _dest.getSparseBlock().sort(i); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java index 581b9ec..2ab4554 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java @@ -44,7 +44,7 @@ public class ReaderBinaryCell extends MatrixReader throws IOException, DMLRuntimeException { //allocate output matrix block - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false); + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java index ebb24dd..3fefb56 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java @@ -57,7 +57,7 @@ public class ReaderTextCSV extends MatrixReader //allocate output matrix block MatrixBlock ret = null; if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for matrix w/ unknown dimensions - ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false); + ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 24a5db1..64c055c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -248,7 +248,7 @@ public class ReaderTextCSVParallel extends MatrixReader // allocate target matrix block based on given size; // need to allocate sparse as well since lock-free insert into target - return createOutputMatrixBlock(nrow, ncol, estnnz, true, true); + return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz, true, true); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java index 0dce052..15e7caf 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java @@ -56,7 +56,7 @@ public class ReaderTextCell extends MatrixReader throws IOException, DMLRuntimeException { //allocate output matrix block - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false); + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); //prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); @@ -84,7 +84,7 @@ public class ReaderTextCell extends MatrixReader throws IOException, DMLRuntimeException { //allocate output matrix block - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false); + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, true, false); //core read readRawTextCellMatrixFromInputStream(is, ret, rlen, clen, brlen, bclen, _isMMFile); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index 309f5b7..d35ff7c 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -89,7 +89,7 @@ public class ReaderTextCellParallel extends MatrixReader checkValidInputFile(fs, path); //allocate output matrix block - MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false); + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); //core read readTextCellMatrixFromHDFS(path, job, ret, rlen, clen, brlen, bclen, _isMMFile);
