Repository: systemml
Updated Branches:
  refs/heads/master d753af90d -> bba7236fb


[SYSTEMML-2147] Performance read of sparse short-wide binary matrices

This patch improves the performance of reading sparse but short and wide
binary matrices with the multi-threaded reader. This reader synchronized
over the first row per row of blocks which leads to thread contention in
case of threads > num row blocks. We now preallocate all rows in such
scenarios to allow a more fine-grained synchronization without hurting
performance for the common case of tall and skinny matrices.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/bba7236f
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/bba7236f
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/bba7236f

Branch: refs/heads/master
Commit: bba7236fb3de68f12961905544d2ceca7531fbf9
Parents: d753af9
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Tue Feb 13 00:00:44 2018 -0800
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Tue Feb 13 00:00:44 2018 -0800

----------------------------------------------------------------------
 .../apache/sysml/runtime/io/MatrixReader.java   | 10 ++-
 .../runtime/io/ReaderBinaryBlockParallel.java   | 44 ++++++----
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 84 ++++++++++----------
 3 files changed, 77 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/bba7236f/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java 
b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index cfdacf9..513a741 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -81,6 +81,8 @@ public abstract class MatrixReader
                
                //determine target representation (sparse/dense)
                boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, 
clen, estnnz); 
+               int numThreads = 
OptimizerUtils.getParallelBinaryReadParallelism();
+               long numBlocks = (long)Math.ceil((double)rlen / brlen);
                
                //prepare result matrix block
                MatrixBlock ret = new MatrixBlock((int)rlen, (int)clen, sparse, 
estnnz);
@@ -94,8 +96,12 @@ public abstract class MatrixReader
                                && clen >= 0 && bclen > 0 && rlen >= 0 && brlen 
> 0 ) {  //all dims known
                                //note: allocate w/ min 2 nnz to ensure 
allocated row object because
                                //adaptive change from scalar to row could 
cause synchronization issues
-                               for( int i=0; i<rlen; i+=brlen )
-                                       sblock.allocate(i, 
Math.max((int)(estnnz/rlen),2), (int)clen);
+                               if( numThreads <= numBlocks )
+                                       for( int i=0; i<rlen; i+=brlen )
+                                               sblock.allocate(i, 
Math.max((int)(estnnz/rlen),2), (int)clen);
+                               else //allocate all rows to avoid contention
+                                       for( int i=0; i<rlen; i++ )
+                                               sblock.allocate(i, 
Math.max((int)(estnnz/rlen),2), (int)clen);
                        }
                }
                

http://git-wip-us.apache.org/repos/asf/systemml/blob/bba7236f/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index e5c53dd..15d8a70 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -66,8 +66,11 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
                
-               //core read 
-               readBinaryBlockMatrixFromHDFS(path, job, fs, ret, rlen, clen, 
brlen, bclen);
+               //core read
+               int numThreads = 
OptimizerUtils.getParallelBinaryReadParallelism();
+               long numBlocks = (long)Math.ceil((double)rlen / brlen);
+               readBinaryBlockMatrixFromHDFS(path, job, fs, ret,
+                       rlen, clen, brlen, bclen, numThreads<=numBlocks);
                
                //finally check if change of sparse/dense block representation 
required
                if( !AGGREGATE_BLOCK_NNZ )
@@ -77,7 +80,8 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                return ret;
        }
 
-       private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf 
job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int 
bclen )
+       private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf 
job, FileSystem fs, MatrixBlock dest,
+                       long rlen, long clen, int brlen, int bclen, boolean 
syncBlock )
                throws IOException, DMLRuntimeException
        {
                //set up preferred custom serialization framework for binary 
block format
@@ -90,7 +94,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                        ExecutorService pool = 
Executors.newFixedThreadPool(_numThreads);
                        ArrayList<ReadFileTask> tasks = new ArrayList<>();
                        for( Path lpath : 
IOUtilFunctions.getSequenceFilePaths(fs, path) ){
-                               ReadFileTask t = new ReadFileTask(lpath, job, 
dest, rlen, clen, brlen, bclen);
+                               ReadFileTask t = new ReadFileTask(lpath, job, 
dest, rlen, clen, brlen, bclen, syncBlock);
                                tasks.add(t);
                        }
 
@@ -116,16 +120,14 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
 
        private static class ReadFileTask implements Callable<Object> 
        {
-               private Path _path = null;
-               private JobConf _job = null;
-               private MatrixBlock _dest = null;
-               private long _rlen = -1;
-               private long _clen = -1;
-               private int _brlen = -1;
-               private int _bclen = -1;
+               private final Path _path;
+               private final JobConf _job;
+               private final MatrixBlock _dest;
+               private final long _rlen, _clen;
+               private final int _brlen, _bclen;
+               private final boolean _syncBlocks;
                
-               public ReadFileTask(Path path, JobConf job, MatrixBlock dest, 
long rlen, long clen, int brlen, int bclen)
-               {
+               public ReadFileTask(Path path, JobConf job, MatrixBlock dest, 
long rlen, long clen, int brlen, int bclen, boolean syncBlocks) {
                        _path = path;
                        _job = job;
                        _dest = dest;
@@ -133,6 +135,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                        _clen = clen;
                        _brlen = brlen;
                        _bclen = bclen;
+                       _syncBlocks = syncBlocks;
                }
 
                @Override
@@ -179,8 +182,16 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                                                        //NOTE: fine-grained 
locking depends on MCSR SparseRow objects 
                                                        SparseBlock sblock = 
_dest.getSparseBlock();
                                                        if( sblock instanceof 
SparseBlockMCSR && sblock.get(row_offset) != null ) {
-                                                               synchronized( 
sblock.get(row_offset) ){ 
-                                                                       
_dest.appendToSparse(value, row_offset, col_offset);
+                                                               if( _syncBlocks 
) {
+                                                                       
synchronized( sblock.get(row_offset) ){ 
+                                                                               
_dest.appendToSparse(value, row_offset, col_offset);
+                                                                       }
+                                                               }
+                                                               else {
+                                                                       for( 
int i=0; i<rows; i++ ) 
+                                                                               
synchronized( sblock.get(row_offset+i) ) {
+                                                                               
        _dest.appendRowToSparse(sblock, value, i, row_offset, col_offset, true);
+                                                                               
}
                                                                }
                                                        }
                                                        else {
@@ -193,8 +204,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                                                        
_dest.appendToSparse(value, row_offset, col_offset);
                                                }
                                        } 
-                                       else
-                                       {
+                                       else {
                                                _dest.copy( row_offset, 
row_offset+rows-1, 
                                                        col_offset, 
col_offset+cols-1, value, false );
                                        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/bba7236f/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index b10e153..6d38a91 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -708,58 +708,58 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                //init sparse rows if necessary
                allocateSparseRowsBlock(false);
                
-               if( that.sparse ) //SPARSE <- SPARSE
+               //append individual rows
+               int m2 = that.rlen;
+               for(int i=0; i<m2; i++)
+                       appendRowToSparse(sparseBlock, that, i, rowoffset, 
coloffset, deep);
+       }
+       
+       public void appendRowToSparse( SparseBlock dest, MatrixBlock src, int 
i, int rowoffset, int coloffset, boolean deep ) {
+               if( src.sparse ) //SPARSE <- SPARSE
                {
-                       SparseBlock a = that.sparseBlock;
-                       SparseBlock c = sparseBlock;
-                       for( int i=0; i<that.rlen; i++ )
-                       {
-                               if( a.isEmpty(i) ) continue;
-                               int aix = rowoffset+i;
-                               
-                               //single block append (avoid re-allocations)
-                               if( !c.isAllocated(aix) && coloffset==0 ) { 
-                                       //note: the deep copy flag is only 
relevant for MCSR due to
-                                       //shallow references of b.get(i); other 
block formats do not
-                                       //require a redundant copy because 
b.get(i) created a new row.
-                                       boolean ldeep = (deep && a instanceof 
SparseBlockMCSR);
-                                       c.set(aix, a.get(i), ldeep);
-                               }
-                               else { //general case
-                                       int pos = a.pos(i);
-                                       int len = a.size(i);
-                                       int[] ix = a.indexes(i);
-                                       double[] val = a.values(i);
-                                       if( estimatedNNzsPerRow > 0 )
-                                               c.allocate(aix, 
Math.max(estimatedNNzsPerRow, c.size(aix)+len), clen);
-                                       else
-                                               c.allocate(aix, 
c.size(aix)+len);
-                                       for( int j=pos; j<pos+len; j++ )
-                                               c.append(aix, coloffset+ix[j], 
val[j]);
-                               }
+                       SparseBlock a = src.sparseBlock;
+                       if( a.isEmpty(i) ) return;
+                       int aix = rowoffset+i;
+                       
+                       //single block append (avoid re-allocations)
+                       if( !dest.isAllocated(aix) && coloffset==0 ) { 
+                               //note: the deep copy flag is only relevant for 
MCSR due to
+                               //shallow references of b.get(i); other block 
formats do not
+                               //require a redundant copy because b.get(i) 
created a new row.
+                               boolean ldeep = (deep && a instanceof 
SparseBlockMCSR);
+                               dest.set(aix, a.get(i), ldeep);
+                       }
+                       else { //general case
+                               int pos = a.pos(i);
+                               int len = a.size(i);
+                               int[] ix = a.indexes(i);
+                               double[] val = a.values(i);
+                               if( estimatedNNzsPerRow > 0 )
+                                       dest.allocate(aix, 
Math.max(estimatedNNzsPerRow, dest.size(aix)+len), clen);
+                               else
+                                       dest.allocate(aix, dest.size(aix)+len);
+                               for( int j=pos; j<pos+len; j++ )
+                                       dest.append(aix, coloffset+ix[j], 
val[j]);
                        }
                }
                else //SPARSE <- DENSE
                {
-                       DenseBlock a = that.getDenseBlock();
-                       SparseBlock c = getSparseBlock();
-                       final int m2 = that.rlen;
-                       final int n2 = that.clen;
-                       for( int i=0; i<m2; i++ ) {
-                               double[] avals = a.values(i);
-                               int aix = a.pos(i);
-                               int cix = rowoffset + i;
-                               for( int j=0; j<n2; j++ ) {
-                                       double bval = avals[aix+j];
-                                       if( bval != 0 ) {
-                                               c.allocate(cix, 
estimatedNNzsPerRow, clen);
-                                               c.append(cix, coloffset+j, 
bval);
-                                       }
+                       DenseBlock a = src.getDenseBlock();
+                       final int n2 = src.clen;
+                       double[] avals = a.values(i);
+                       int aix = a.pos(i);
+                       int cix = rowoffset + i;
+                       for( int j=0; j<n2; j++ ) {
+                               double bval = avals[aix+j];
+                               if( bval != 0 ) {
+                                       dest.allocate(cix, estimatedNNzsPerRow, 
clen);
+                                       dest.append(cix, coloffset+j, bval);
                                }
                        }
                }
        }
        
+       
        /**
         * Sorts all existing sparse rows by column indexes.
         */

Reply via email to