[SYSTEMML-552] Performance parallel reader sparse binary block matrices

Reading a sparse binary block matrix into CP is realized via append to
avoid repeated reshifting for maintaining the sorted order. In case of
matrices with multiple column blocks, a parallel read requires locking
during append and final sorting of sparse rows. This patch improves
performance by (1) sorting sparse rows in parallel, and (2) fine-grained
locking (synchronization) per row block instead locking the entire
matrix. On a scenario with a 100k x 100k, sp=0.01 matrix (with k=16
vcores) this led to an improvement from 13.1s to 8.9s (w/ parallel
sorting) and to 4.1s (w/ both features). 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/41ca1d16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/41ca1d16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/41ca1d16

Branch: refs/heads/master
Commit: 41ca1d163c335156afaf491df21435953d04c1d4
Parents: 0089c3d
Author: Matthias Boehm <[email protected]>
Authored: Mon Mar 7 12:22:23 2016 -0800
Committer: Matthias Boehm <[email protected]>
Committed: Mon Mar 7 12:23:29 2016 -0800

----------------------------------------------------------------------
 .../apache/sysml/runtime/io/MatrixReader.java   | 15 ++++-
 .../sysml/runtime/io/ReaderBinaryBlock.java     |  2 +-
 .../runtime/io/ReaderBinaryBlockParallel.java   | 64 ++++++++++++++++----
 .../sysml/runtime/io/ReaderBinaryCell.java      |  2 +-
 .../apache/sysml/runtime/io/ReaderTextCSV.java  |  2 +-
 .../sysml/runtime/io/ReaderTextCSVParallel.java |  2 +-
 .../apache/sysml/runtime/io/ReaderTextCell.java |  4 +-
 .../runtime/io/ReaderTextCellParallel.java      |  2 +-
 8 files changed, 71 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java 
b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index ccd0aee..d8a5c2d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -26,10 +26,11 @@ import java.util.LinkedList;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -98,7 +99,7 @@ public abstract class MatrixReader
         * @throws DMLRuntimeException 
         * @throws IOException 
         */
-       protected static MatrixBlock createOutputMatrixBlock( long rlen, long 
clen, long estnnz, boolean mallocDense, boolean mallocSparse ) 
+       protected static MatrixBlock createOutputMatrixBlock( long rlen, long 
clen, int bclen, int brlen, long estnnz, boolean mallocDense, boolean 
mallocSparse ) 
                throws IOException, DMLRuntimeException
        {
                //check input dimension
@@ -112,8 +113,16 @@ public abstract class MatrixReader
                MatrixBlock ret = new MatrixBlock((int)rlen, (int)clen, sparse, 
estnnz);
                if( !sparse && mallocDense )
                        ret.allocateDenseBlockUnsafe((int)rlen, (int)clen);
-               else if( sparse && mallocSparse  )
+               else if( sparse && mallocSparse  ) {
                        ret.allocateSparseRowsBlock();
+                       SparseBlock sblock = ret.getSparseBlock();
+                       //create synchronization points for MCSR (start row per 
block row)
+                       if( sblock instanceof SparseBlockMCSR && clen > bclen   
   //multiple col blocks 
+                               && clen > 0 && bclen > 0 && rlen > 0 && brlen > 
0 ) {  //all dims known
+                               for( int i=0; i<rlen; i+=brlen )
+                                       ret.getSparseBlock().allocate(i, 
Math.min((int)(estnnz/rlen),1), (int)clen);
+                       }
+               }
                
                return ret;
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 6493abb..03e42f4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -53,7 +53,7 @@ public class ReaderBinaryBlock extends MatrixReader
                throws IOException, DMLRuntimeException 
        {
                //allocate output matrix block
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, 
false, false);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, 
bclen, estnnz, false, false);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 03f17ac..cdd7b33 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -31,12 +31,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
 
@@ -55,7 +56,7 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                throws IOException, DMLRuntimeException 
        {       
                //allocate output matrix block (incl block allocation for 
parallel)
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, 
true, true);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, 
bclen, estnnz, true, true);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
@@ -112,7 +113,6 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
 
                        //wait until all tasks have been executed
                        List<Future<Object>> rt = pool.invokeAll(tasks);        
-                       pool.shutdown();
                        
                        //check for exceptions and aggregate nnz
                        long lnnz = 0;
@@ -121,10 +121,16 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                        
                        //post-processing
                        dest.setNonZeros( lnnz );
-                       if( dest.isInSparseFormat() && clen>bclen ){
-                               //no need to sort if 1 column block since 
always sorted
-                               dest.sortSparseRows();
-                       }                       
+                       if( dest.isInSparseFormat() && clen>bclen ) {
+                               //need to sort if multiple column block; 
otherwise always sorted
+                               ArrayList<SortRowsTask> tasks2 = new 
ArrayList<SortRowsTask>();
+                               int blklen = 
(int)(Math.ceil((double)rlen/_numThreads));
+                               for( int i=0; i<_numThreads & i*blklen<rlen; 
i++ )
+                                       tasks2.add(new SortRowsTask(dest, 
i*blklen, Math.min((i+1)*blklen, (int)rlen)));
+                               pool.invokeAll(tasks2);
+                       }
+                       
+                       pool.shutdown();
                } 
                catch (Exception e) {
                        throw new IOException("Failed parallel read of binary 
block input.", e);
@@ -196,18 +202,29 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                                        {
                                                //note: append requires final 
sort
                                                if (cols < _clen ) {
-                                                       synchronized( _dest ){ 
//sparse requires lock, when matrix is wider than one block
-                                                               
_dest.appendToSparse(value, row_offset, col_offset);
+                                                       //sparse requires lock, 
when matrix is wider than one block
+                                                       //(fine-grained locking 
of block rows instead of the entire matrix)
+                                                       //NOTE: fine-grained 
locking depends on MCSR SparseRow objects 
+                                                       SparseBlock sblock = 
_dest.getSparseBlock();
+                                                       if( sblock instanceof 
SparseBlockMCSR && sblock.get(row_offset) != null ) {
+                                                               synchronized( 
sblock.get(row_offset) ){ 
+                                                                       
_dest.appendToSparse(value, row_offset, col_offset);
+                                                               }
+                                                       }
+                                                       else {
+                                                               synchronized( 
_dest ){ 
+                                                                       
_dest.appendToSparse(value, row_offset, col_offset);
+                                                               }
                                                        }
                                                }
-                                               else
+                                               else { //quickpath (no 
synchronization)
                                                        
_dest.appendToSparse(value, row_offset, col_offset);
+                                               }
                                        } 
                                        else
                                        {
                                                _dest.copy( row_offset, 
row_offset+rows-1, 
-                                                                  col_offset, 
col_offset+cols-1,
-                                                                  value, false 
);
+                                                                  col_offset, 
col_offset+cols-1, value, false );
                                        }
                                        
                                        //aggregate nnz
@@ -222,4 +239,27 @@ public class ReaderBinaryBlockParallel extends 
ReaderBinaryBlock
                        return lnnz;
                }
        }
+       
+       /**
+        * 
+        */
+       private static class SortRowsTask implements Callable<Object> 
+       {
+               private MatrixBlock _dest = null;
+               private int _rl = -1;
+               private int _ru = -1;
+               
+               public SortRowsTask(MatrixBlock dest, int rl, int ru) {
+                       _dest = dest;
+                       _rl = rl;
+                       _ru = ru;
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       for( int i=_rl; i<_ru; i++ )
+                               _dest.getSparseBlock().sort(i);
+                       return null;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index 581b9ec..2ab4554 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -44,7 +44,7 @@ public class ReaderBinaryCell extends MatrixReader
                throws IOException, DMLRuntimeException 
        {
                //allocate output matrix block
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, 
true, false);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index ebb24dd..3fefb56 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -57,7 +57,7 @@ public class ReaderTextCSV extends MatrixReader
                //allocate output matrix block
                MatrixBlock ret = null;
                if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file 
size for matrix w/ unknown dimensions
-                       ret = createOutputMatrixBlock(rlen, clen, estnnz, true, 
false);
+                       ret = createOutputMatrixBlock(rlen, clen, (int)rlen, 
(int)clen, estnnz, true, false);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 24a5db1..64c055c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -248,7 +248,7 @@ public class ReaderTextCSVParallel extends MatrixReader
 
                // allocate target matrix block based on given size; 
                // need to allocate sparse as well since lock-free insert into 
target
-               return createOutputMatrixBlock(nrow, ncol, estnnz, true, true);
+               return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz, 
true, true);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 0dce052..15e7caf 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -56,7 +56,7 @@ public class ReaderTextCell extends MatrixReader
                throws IOException, DMLRuntimeException 
        {
                //allocate output matrix block
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, 
true, false);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
@@ -84,7 +84,7 @@ public class ReaderTextCell extends MatrixReader
                throws IOException, DMLRuntimeException 
        {
                //allocate output matrix block
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, 
true, false);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, 
bclen, estnnz, true, false);
        
                //core read 
                readRawTextCellMatrixFromInputStream(is, ret, rlen, clen, 
brlen, bclen, _isMMFile);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index 309f5b7..d35ff7c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -89,7 +89,7 @@ public class ReaderTextCellParallel extends MatrixReader
                checkValidInputFile(fs, path);
                
                //allocate output matrix block
-               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, 
true, false);
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
        
                //core read 
                readTextCellMatrixFromHDFS(path, job, ret, rlen, clen, brlen, 
bclen, _isMMFile);

Reply via email to