Repository: systemml Updated Branches: refs/heads/master 92ecae34d -> 5315e1d2d
[SYSTEMML-1682] Performance csv read w/ unknown size information This patch improves the performance of reading moderately large csv files with unknown dimensions, which triggered so far always a csv reblock on spark. In detail, the improvements include (1) deciding on in-memory reblock based on the given file size (similar to our MR backend), (2) avoiding spark context creation (by checking for cached inputs on spark checkpoint instructions), and (3) fixed handling of the estimated number of non-zeros (robustness for unknown dimensions). For example, on the US census 1990 dataset (2458285 x 68, dense; 470MB csv / 1.3GB binary), this patch improved performance from 39s to 4.3s (which is good given that the csv read w/ known sizes requires 3.8s). Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5315e1d2 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5315e1d2 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5315e1d2 Branch: refs/heads/master Commit: 5315e1d2d13795ac14d28292cf1f508e3d5db9a2 Parents: 92ecae3 Author: Matthias Boehm <[email protected]> Authored: Fri Jun 9 18:49:37 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jun 9 20:19:11 2017 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/hops/recompile/Recompiler.java | 14 +++++++++++--- .../instructions/spark/CheckpointSPInstruction.java | 10 +++++++++- .../org/apache/sysml/runtime/io/ReaderTextCSV.java | 11 ++++++----- .../sysml/runtime/io/ReaderTextCSVParallel.java | 5 +++-- .../org/apache/sysml/runtime/util/DataConverter.java | 3 ++- .../org/apache/sysml/runtime/util/MapReduceTool.java | 6 ++++-- 6 files changed, 35 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index c92b735..25efecf 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -123,8 +123,8 @@ public class Recompiler //Max threshold for in-memory reblock of text input [in bytes] //reason: single-threaded text read at 20MB/s, 1GB input -> 50s (should exploit parallelism) //note that we scale this threshold up by the degree of available parallelism - private static final long CP_REBLOCK_THRESHOLD_SIZE = (long)1024*1024*1024; - private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = (long)256*1024*1024; + private static final long CP_REBLOCK_THRESHOLD_SIZE = 1L*1024*1024*1024; + private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = CP_REBLOCK_THRESHOLD_SIZE; /** Local reused rewriter for dynamic rewrites during recompile */ @@ -1817,7 +1817,15 @@ public class Recompiler //robustness unknown dimensions, e.g., for csv reblock if( rows <= 0 || cols <= 0 ) { - return false; + try { + long size = MapReduceTool.getFilesizeOnHDFS(new Path(obj.getFileName())); + return (size < OptimizerUtils.getLocalMemBudget() && + size < CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE * + OptimizerUtils.getParallelTextReadParallelism()); + } + catch(IllegalArgumentException | IOException ex) { + throw new DMLRuntimeException(ex); + } } //check valid dimensions and memory requirements http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java index cddfd12..e7c1c78 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java @@ -77,7 +77,7 @@ public class CheckpointSPInstruction extends UnarySPInstruction { SparkExecutionContext sec = (SparkExecutionContext)ec; - // Step 1: early abort on non-existing inputs + // Step 1: early abort on non-existing or in-memory (cached) inputs // ------- // (checkpoints are generated for all read only variables in loops; due to unbounded scoping and // conditional control flow they to not necessarily exist in the symbol table during runtime - @@ -88,6 +88,14 @@ public class CheckpointSPInstruction extends UnarySPInstruction sec.setVariable( output.getName(), new BooleanObject(false)); return; } + //------- + //(for csv input files with unknown dimensions, we might have generated a checkpoint after + //csvreblock although not necessary because the csvreblock was subject to in-memory reblock) + CacheableData<?> obj = sec.getCacheableData(input1.getName()); + if( obj.isCached(true) ) { //available in memory + sec.setVariable(output.getName(), obj); + return; + } //get input rdd handle (for matrix or frame) JavaPairRDD<?,?> in = sec.getRDDHandleForVariable( input1.getName(), InputInfo.BinaryBlockInputInfo ); http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java index 27637ab..42b1184 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java @@ -55,7 +55,7 @@ public class ReaderTextCSV extends MatrixReader { //allocate output matrix block MatrixBlock ret = null; - if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for matrix w/ unknown dimensions + if( rlen>0 && clen>0 ) //otherwise allocated on read ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); //prepare file access @@ -98,7 +98,7 @@ public class ReaderTextCSV extends MatrixReader @SuppressWarnings("unchecked") private MatrixBlock readCSVMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue ) - throws IOException + throws IOException, DMLRuntimeException { //prepare file paths in alphanumeric order ArrayList<Path> files=new ArrayList<Path>(); @@ -222,7 +222,7 @@ public class ReaderTextCSV extends MatrixReader } private MatrixBlock computeCSVSize( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) - throws IOException + throws IOException, DMLRuntimeException { int nrow = -1; int ncol = -1; @@ -255,7 +255,8 @@ public class ReaderTextCSV extends MatrixReader } } - //create new matrix block (assume sparse for consistency w/ compiler) - return new MatrixBlock(nrow, ncol, true); + // allocate target matrix block based on given size; + return createOutputMatrixBlock(nrow, ncol, + nrow, ncol, (long)nrow*ncol, true, false); } } http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 88388f4..2fbe32f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -210,10 +210,11 @@ public class ReaderTextCSVParallel extends MatrixReader catch (Exception e) { throw new IOException("Threadpool Error " + e.getMessage(), e); } - + // allocate target matrix block based on given size; // need to allocate sparse as well since lock-free insert into target - return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz, true, true); + long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz; + return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz2, true, true); } private static class SplitOffsetInfos { http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index ab4d47b..b016fdf 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -194,7 +194,8 @@ public class DataConverter { //Timing time = new Timing(true); - long estnnz = (long)(prop.expectedSparsity*prop.rlen*prop.clen); + long estnnz = (prop.expectedSparsity <= 0 || prop.rlen <= 0 || prop.clen <= 0) ? + -1 : (long)(prop.expectedSparsity*prop.rlen*prop.clen); //core matrix reading MatrixBlock ret = null; http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java index e81dbc1..5bdccfd 100644 --- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java +++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java @@ -510,7 +510,8 @@ public class MapReduceTool throws IOException, DMLRuntimeException { MatrixReader reader = MatrixReaderFactory.createMatrixReader(inputinfo); - MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, brlen, bclen, rlen*clen); + long estnnz = (rlen <= 0 || clen <= 0) ? -1 : rlen * clen; + MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, brlen, bclen, estnnz); return DataConverter.convertToDoubleMatrix(mb); } @@ -518,7 +519,8 @@ public class MapReduceTool throws IOException, DMLRuntimeException { MatrixReader reader = MatrixReaderFactory.createMatrixReader(inputinfo); - MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, brlen, bclen, rlen*clen); + long estnnz = (rlen <= 0 || clen <= 0) ? -1 : rlen * clen; + MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, brlen, bclen, estnnz); return DataConverter.convertToDoubleVector(mb); }
