Repository: systemml Updated Branches: refs/heads/master 375ad1a14 -> 44a1a67df
[SYSTEMML-2460] Performance matrix market reader sparse/ultra-sparse This patch improves the parallel textcell and matrix market reader for sparse and ultra-sparse inputs. Especially for matrix market with pattern matrices (that do not require expensive double parsing), garbage collection overheads often dominate. We now make an additional parallel pass over the input to determine the nnz per row for preallocation. On the following graph datasets, this patch improved the singlenode read performance by almost 3x (but these runtime are still mostly dominate by garbage collection): a) germany_osm (11M x 11M, 24M non-zeros): 27.9s -> 10s b) uk-2005 (39M x 39M, 936M non-zeros): 332s -> 120.1s Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/aca62baa Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/aca62baa Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/aca62baa Branch: refs/heads/master Commit: aca62baa1ca2de5aaeb30b9becf5fcea26247411 Parents: 375ad1a Author: Matthias Boehm <[email protected]> Authored: Sat Jul 21 00:07:21 2018 -0700 Committer: Matthias Boehm <[email protected]> Committed: Sat Jul 21 00:07:21 2018 -0700 ---------------------------------------------------------------------- .../runtime/io/ReaderTextCellParallel.java | 74 +++++++++++++++++--- 1 file changed, 65 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/aca62baa/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index de948f3..864dfda 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -20,11 +20,12 @@ package org.apache.sysml.runtime.io; import java.io.IOException; -import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -40,9 +41,11 @@ import org.apache.sysml.runtime.matrix.data.DenseBlock; import org.apache.sysml.runtime.matrix.data.IJV; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.SparseBlock; import org.apache.sysml.runtime.util.CommonThreadPool; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.runtime.util.UtilFunctions; /** * Parallel version of ReaderTextCell.java. To summarize, we create read tasks per split @@ -91,21 +94,34 @@ public class ReaderTextCellParallel extends ReaderTextCell try { - //create read tasks for all splits ExecutorService pool = CommonThreadPool.get(par); InputSplit[] splits = informat.getSplits(job, par); - ArrayList<ReadTask> tasks = new ArrayList<>(); - for( InputSplit split : splits ){ - ReadTask t = new ReadTask(split, informat, job, dest, rlen, clen, _isMMFile, _mmProps); - tasks.add(t); + + //count nnz per row for sparse preallocation + if( dest.isInSparseFormat() ) { + int[] rNnz = new int[(int)rlen]; + boolean isSymmetric = _isMMFile && _mmProps.isSymmetric(); + List<CountNnzTask> tasks = Arrays.stream(splits) + .map(s ->new CountNnzTask(s, informat, job, rNnz, isSymmetric)) + .collect(Collectors.toList()); + List<Future<Void>> rt1 = pool.invokeAll(tasks); + for( Future<Void> task : rt1 ) + task.get(); + SparseBlock sblock = dest.allocateBlock().getSparseBlock(); + for( int i=0; i<rlen; i++ ) + if( rNnz[i] > 0 ) + sblock.allocate(i, UtilFunctions.roundToNext(rNnz[i], 4)); } - //wait until all tasks have been executed - List<Future<Long>> rt = pool.invokeAll(tasks); + //create and execute read tasks for all splits + List<ReadTask> tasks = Arrays.stream(splits) + .map(s ->new ReadTask(s, informat, job, dest, rlen, clen, _isMMFile, _mmProps)) + .collect(Collectors.toList()); + List<Future<Long>> rt2 = pool.invokeAll(tasks); //check for exceptions and aggregate nnz long lnnz = 0; - for( Future<Long> task : rt ) + for( Future<Long> task : rt2 ) lnnz += task.get(); //post-processing @@ -220,6 +236,46 @@ public class ReaderTextCellParallel extends ReaderTextCell } } + public static class CountNnzTask implements Callable<Void> { + private final InputSplit _split; + private final TextInputFormat _informat; + private final JobConf _job; + private final int[] _rNnz; + private final boolean _isSymmetric; + + public CountNnzTask( InputSplit split, TextInputFormat informat, JobConf job, int[] rNnz, boolean isSymmetric ) { + _split = split; + _informat = informat; + _job = job; + _rNnz = rNnz; + _isSymmetric = isSymmetric; + } + + @Override + public Void call() throws Exception { + LongWritable key = new LongWritable(); + Text value = new Text(); + FastStringTokenizer st = new FastStringTokenizer(' '); + + RecordReader<LongWritable,Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL); + try { + //counting without locking as conflicts unlikely + while( reader.next(key, value) ) { + if( value.toString().charAt(0) == '%' ) + continue; + st.reset( value.toString() ); + _rNnz[(int)st.nextLong()-1] ++; + if( _isSymmetric ) + _rNnz[(int)st.nextLong()-1] ++; + } + } + finally { + IOUtilFunctions.closeSilently(reader); + } + return null; + } + } + /** * Useful class for buffering unordered cells before locking target onces and * appending all buffered cells.
