Repository: systemml
Updated Branches:
  refs/heads/master 92ecae34d -> 5315e1d2d


[SYSTEMML-1682] Performance csv read w/ unknown size information

This patch improves the performance of reading moderately large csv
files with unknown dimensions, which triggered so far always a csv
reblock on spark. In detail, the improvements include (1) deciding on
in-memory reblock based on the given file size (similar to our MR
backend), (2) avoiding spark context creation (by checking for cached
inputs on spark checkpoint instructions), and (3) fixed handling of the
estimated number of non-zeros (robustness for unknown dimensions). 

For example, on the US census 1990 dataset (2458285 x 68, dense; 470MB
csv / 1.3GB binary), this patch improved performance from 39s to 4.3s
(which is good given that the csv read w/ known sizes requires 3.8s).  


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/5315e1d2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/5315e1d2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/5315e1d2

Branch: refs/heads/master
Commit: 5315e1d2d13795ac14d28292cf1f508e3d5db9a2
Parents: 92ecae3
Author: Matthias Boehm <[email protected]>
Authored: Fri Jun 9 18:49:37 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jun 9 20:19:11 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/recompile/Recompiler.java   | 14 +++++++++++---
 .../instructions/spark/CheckpointSPInstruction.java   | 10 +++++++++-
 .../org/apache/sysml/runtime/io/ReaderTextCSV.java    | 11 ++++++-----
 .../sysml/runtime/io/ReaderTextCSVParallel.java       |  5 +++--
 .../org/apache/sysml/runtime/util/DataConverter.java  |  3 ++-
 .../org/apache/sysml/runtime/util/MapReduceTool.java  |  6 ++++--
 6 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index c92b735..25efecf 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -123,8 +123,8 @@ public class Recompiler
        //Max threshold for in-memory reblock of text input [in bytes]
        //reason: single-threaded text read at 20MB/s, 1GB input -> 50s (should 
exploit parallelism)
        //note that we scale this threshold up by the degree of available 
parallelism
-       private static final long CP_REBLOCK_THRESHOLD_SIZE = 
(long)1024*1024*1024; 
-       private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = 
(long)256*1024*1024;
+       private static final long CP_REBLOCK_THRESHOLD_SIZE = 
1L*1024*1024*1024; 
+       private static final long CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE = 
CP_REBLOCK_THRESHOLD_SIZE;
        
        /** Local reused rewriter for dynamic rewrites during recompile */
 
@@ -1817,7 +1817,15 @@ public class Recompiler
                
                //robustness unknown dimensions, e.g., for csv reblock
                if( rows <= 0 || cols <= 0 ) {
-                       return false;
+                       try {
+                               long size = MapReduceTool.getFilesizeOnHDFS(new 
Path(obj.getFileName()));
+                               return (size < 
OptimizerUtils.getLocalMemBudget() &&
+                                       size < 
CP_CSV_REBLOCK_UNKNOWN_THRESHOLD_SIZE * 
+                                       
OptimizerUtils.getParallelTextReadParallelism());
+                       } 
+                       catch(IllegalArgumentException | IOException ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
                }
                
                //check valid dimensions and memory requirements

http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
index cddfd12..e7c1c78 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -77,7 +77,7 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction
        {
                SparkExecutionContext sec = (SparkExecutionContext)ec;
                
-               // Step 1: early abort on non-existing inputs 
+               // Step 1: early abort on non-existing or in-memory (cached) 
inputs
                // -------
                // (checkpoints are generated for all read only variables in 
loops; due to unbounded scoping and 
                // conditional control flow they to not necessarily exist in 
the symbol table during runtime - 
@@ -88,6 +88,14 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction
                        sec.setVariable( output.getName(), new 
BooleanObject(false));
                        return;
                }
+               //-------
+               //(for csv input files with unknown dimensions, we might have 
generated a checkpoint after
+               //csvreblock although not necessary because the csvreblock was 
subject to in-memory reblock)
+               CacheableData<?> obj = sec.getCacheableData(input1.getName());
+               if( obj.isCached(true) ) { //available in memory
+                       sec.setVariable(output.getName(), obj);
+                       return;
+               }
                
                //get input rdd handle (for matrix or frame)
                JavaPairRDD<?,?> in = sec.getRDDHandleForVariable( 
input1.getName(), InputInfo.BinaryBlockInputInfo );

http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index 27637ab..42b1184 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -55,7 +55,7 @@ public class ReaderTextCSV extends MatrixReader
        {
                //allocate output matrix block
                MatrixBlock ret = null;
-               if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file 
size for matrix w/ unknown dimensions
+               if( rlen>0 && clen>0 ) //otherwise allocated on read
                        ret = createOutputMatrixBlock(rlen, clen, (int)rlen, 
(int)clen, estnnz, true, false);
                
                //prepare file access
@@ -98,7 +98,7 @@ public class ReaderTextCSV extends MatrixReader
        @SuppressWarnings("unchecked")
        private MatrixBlock readCSVMatrixFromHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock dest, 
                        long rlen, long clen, int brlen, int bclen, boolean 
hasHeader, String delim, boolean fill, double fillValue )
-               throws IOException
+               throws IOException, DMLRuntimeException
        {
                //prepare file paths in alphanumeric order
                ArrayList<Path> files=new ArrayList<Path>();
@@ -222,7 +222,7 @@ public class ReaderTextCSV extends MatrixReader
        }
 
        private MatrixBlock computeCSVSize( List<Path> files, JobConf job, 
FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) 
-               throws IOException 
+               throws IOException, DMLRuntimeException 
        {               
                int nrow = -1;
                int ncol = -1;
@@ -255,7 +255,8 @@ public class ReaderTextCSV extends MatrixReader
                        }
                }
                
-               //create new matrix block (assume sparse for consistency w/ 
compiler)
-               return new MatrixBlock(nrow, ncol, true);
+               // allocate target matrix block based on given size; 
+               return createOutputMatrixBlock(nrow, ncol, 
+                       nrow, ncol, (long)nrow*ncol, true, false);
        }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 88388f4..2fbe32f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -210,10 +210,11 @@ public class ReaderTextCSVParallel extends MatrixReader
                catch (Exception e) {
                        throw new IOException("Threadpool Error " + 
e.getMessage(), e);
                }
-
+               
                // allocate target matrix block based on given size; 
                // need to allocate sparse as well since lock-free insert into 
target
-               return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz, 
true, true);
+               long estnnz2 = (estnnz < 0) ? (long)nrow * ncol : estnnz;
+               return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz2, 
true, true);
        }
 
        private static class SplitOffsetInfos {

http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
index ab4d47b..b016fdf 100644
--- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java
@@ -194,7 +194,8 @@ public class DataConverter
        {       
                //Timing time = new Timing(true);
                
-               long estnnz = (long)(prop.expectedSparsity*prop.rlen*prop.clen);
+               long estnnz = (prop.expectedSparsity <= 0 || prop.rlen <= 0 || 
prop.clen <= 0) ? 
+                       -1 : (long)(prop.expectedSparsity*prop.rlen*prop.clen);
        
                //core matrix reading 
                MatrixBlock ret = null;

http://git-wip-us.apache.org/repos/asf/systemml/blob/5315e1d2/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index e81dbc1..5bdccfd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -510,7 +510,8 @@ public class MapReduceTool
                throws IOException, DMLRuntimeException
        {
                MatrixReader reader = 
MatrixReaderFactory.createMatrixReader(inputinfo);
-               MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, 
brlen, bclen, rlen*clen);
+               long estnnz = (rlen <= 0 || clen <= 0) ? -1 : rlen * clen;
+               MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, 
brlen, bclen, estnnz);
                return DataConverter.convertToDoubleMatrix(mb);
        }
        
@@ -518,7 +519,8 @@ public class MapReduceTool
                throws IOException, DMLRuntimeException
        {
                MatrixReader reader = 
MatrixReaderFactory.createMatrixReader(inputinfo);
-               MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, 
brlen, bclen, rlen*clen);
+               long estnnz = (rlen <= 0 || clen <= 0) ? -1 : rlen * clen;
+               MatrixBlock mb = reader.readMatrixFromHDFS(dir, rlen, clen, 
brlen, bclen, estnnz);
                return DataConverter.convertToDoubleVector(mb);
        }
        

Reply via email to