Repository: systemml
Updated Branches:
  refs/heads/master 375ad1a14 -> 44a1a67df


[SYSTEMML-2460] Performance matrix market reader sparse/ultra-sparse

This patch improves the parallel textcell and matrix market reader for
sparse and ultra-sparse inputs. Especially for matrix market with
pattern matrices (that do not require expensive double parsing), garbage
collection overheads often dominate. We now make an additional parallel
pass over the input to determine the nnz per row for preallocation.

On the following graph datasets, this patch improved the singlenode read
performance by almost 3x (but these runtime are still mostly dominate by
garbage collection):

a) germany_osm (11M x 11M, 24M non-zeros): 27.9s -> 10s
b) uk-2005 (39M x 39M, 936M non-zeros): 332s -> 120.1s


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/aca62baa
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/aca62baa
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/aca62baa

Branch: refs/heads/master
Commit: aca62baa1ca2de5aaeb30b9becf5fcea26247411
Parents: 375ad1a
Author: Matthias Boehm <[email protected]>
Authored: Sat Jul 21 00:07:21 2018 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Sat Jul 21 00:07:21 2018 -0700

----------------------------------------------------------------------
 .../runtime/io/ReaderTextCellParallel.java      | 74 +++++++++++++++++---
 1 file changed, 65 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/aca62baa/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index de948f3..864dfda 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -20,11 +20,12 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -40,9 +41,11 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock;
 import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.util.CommonThreadPool;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
  * Parallel version of ReaderTextCell.java. To summarize, we create read tasks 
per split
@@ -91,21 +94,34 @@ public class ReaderTextCellParallel extends ReaderTextCell
                
                try 
                {
-                       //create read tasks for all splits
                        ExecutorService pool = CommonThreadPool.get(par);
                        InputSplit[] splits = informat.getSplits(job, par);
-                       ArrayList<ReadTask> tasks = new ArrayList<>();
-                       for( InputSplit split : splits ){
-                               ReadTask t = new ReadTask(split, informat, job, 
dest, rlen, clen, _isMMFile, _mmProps);
-                               tasks.add(t);
+                       
+                       //count nnz per row for sparse preallocation
+                       if( dest.isInSparseFormat() ) {
+                               int[] rNnz = new int[(int)rlen];
+                               boolean isSymmetric = _isMMFile && 
_mmProps.isSymmetric();
+                               List<CountNnzTask> tasks = Arrays.stream(splits)
+                                       .map(s ->new CountNnzTask(s, informat, 
job, rNnz, isSymmetric))
+                                       .collect(Collectors.toList());
+                               List<Future<Void>> rt1 = pool.invokeAll(tasks);
+                               for( Future<Void> task : rt1 )
+                                       task.get();
+                               SparseBlock sblock = 
dest.allocateBlock().getSparseBlock();
+                               for( int i=0; i<rlen; i++ )
+                                       if( rNnz[i] > 0 )
+                                               sblock.allocate(i, 
UtilFunctions.roundToNext(rNnz[i], 4));
                        }
                        
-                       //wait until all tasks have been executed
-                       List<Future<Long>> rt = pool.invokeAll(tasks);
+                       //create and execute read tasks for all splits
+                       List<ReadTask> tasks = Arrays.stream(splits)
+                               .map(s ->new ReadTask(s, informat, job, dest, 
rlen, clen, _isMMFile, _mmProps))
+                               .collect(Collectors.toList());
+                       List<Future<Long>> rt2 = pool.invokeAll(tasks);
                        
                        //check for exceptions and aggregate nnz
                        long lnnz = 0;
-                       for( Future<Long> task : rt )
+                       for( Future<Long> task : rt2 )
                                lnnz += task.get();
                        
                        //post-processing
@@ -220,6 +236,46 @@ public class ReaderTextCellParallel extends ReaderTextCell
                }
        }
        
+       public static class CountNnzTask implements Callable<Void> {
+               private final InputSplit _split;
+               private final TextInputFormat _informat;
+               private final JobConf _job;
+               private final int[] _rNnz;
+               private final boolean _isSymmetric;
+               
+               public CountNnzTask( InputSplit split, TextInputFormat 
informat, JobConf job, int[] rNnz, boolean isSymmetric ) {
+                       _split = split;
+                       _informat = informat;
+                       _job = job;
+                       _rNnz = rNnz;
+                       _isSymmetric = isSymmetric;
+               }
+
+               @Override
+               public Void call() throws Exception {
+                       LongWritable key = new LongWritable();
+                       Text value = new Text();
+                       FastStringTokenizer st = new FastStringTokenizer(' ');
+                       
+                       RecordReader<LongWritable,Text> reader = 
_informat.getRecordReader(_split, _job, Reporter.NULL);
+                       try {
+                               //counting without locking as conflicts unlikely
+                               while( reader.next(key, value) ) {
+                                       if( value.toString().charAt(0) == '%' )
+                                               continue;
+                                       st.reset( value.toString() );
+                                       _rNnz[(int)st.nextLong()-1] ++;
+                                       if( _isSymmetric )
+                                               _rNnz[(int)st.nextLong()-1] ++;
+                               }
+                       }
+                       finally {
+                               IOUtilFunctions.closeSilently(reader);
+                       }
+                       return null;
+               }
+       }
+       
        /**
         * Useful class for buffering unordered cells before locking target 
onces and
         * appending all buffered cells.

Reply via email to