[SYSTEMML-630] Fix robustness csv frame readers (count num columns)

In corner cases, our distributed csv write produces empty partitions and
hence empty csv file splits. Depending on the ordering of splits, this
causes problems when determining the number of columns on csv read with
a single row only. This patch introduces a more robust approach for
computing the number of columns on csv read, where we read split headers
until we find a row with ncol>0.  

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/34a2e1d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/34a2e1d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/34a2e1d8

Branch: refs/heads/master
Commit: 34a2e1d8a6f7c7247dd8f8e505b4c2054f4b2589
Parents: 8c4f83a
Author: Matthias Boehm <[email protected]>
Authored: Fri Jun 24 23:08:52 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Fri Jun 24 23:08:52 2016 -0700

----------------------------------------------------------------------
 .../sysml/runtime/io/FrameReaderTextCSV.java    | 21 ++++------
 .../runtime/io/FrameReaderTextCSVParallel.java  | 16 ++------
 .../sysml/runtime/io/IOUtilFunctions.java       | 40 ++++++++++++++++++++
 .../TransformFrameEncodeDecodeTest.java         |  2 -
 4 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index 05796fd..b5a5756 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -217,26 +216,22 @@ public class FrameReaderTextCSV extends FrameReader
                InputSplit[] splits = informat.getSplits(job, 1);
                splits = IOUtilFunctions.sortInputSplits(splits);
                
-               boolean first = true;
-               int ncol = -1;
-               int nrow = -1;
+               //compute number of columns
+               int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, 
job, _props.getDelim());
                
-               for( InputSplit split : splits ) 
+               //compute number of rows
+               int nrow = -1;
+               for( int i=0; i<splits.length; i++ ) 
                {
-                       RecordReader<LongWritable, Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);
+                       RecordReader<LongWritable, Text> reader = 
informat.getRecordReader(splits[i], job, Reporter.NULL);
                        LongWritable key = new LongWritable();
                        Text value = new Text();
                        
                        try
                        {
-                               //read head and first line to determine num 
columns
-                               if( first ) {
-                                       if ( _props.hasHeader() ) 
-                                               reader.next(key, value); 
//ignore header
+                               //ignore header of first split
+                               if( i==0 && _props.hasHeader() )
                                        reader.next(key, value);
-                                       ncol = 
StringUtils.countMatches(value.toString(), _props.getDelim()) + 1;
-                                       nrow = 1; first = false;
-                               }
                                
                                //count remaining number of rows
                                while ( reader.next(key, value) )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
index cb7a05b..da71905 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -96,7 +95,7 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                                offset += count.get();
                        }
                        
-                       //read individial splits
+                       //read individual splits
                        ArrayList<ReadRowsTask> tasks2 = new 
ArrayList<ReadRowsTask>();
                        for( int i=0; i<splits.length; i++ )
                                tasks2.add( new ReadRowsTask(splits[i], 
informat, job, dest, offsets.get(i).intValue(), i==0));
@@ -115,7 +114,7 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
        @Override
        protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, 
FileSystem fs) 
                throws IOException 
-       {               
+       {       
                int numThreads = 
OptimizerUtils.getParallelTextReadParallelism();
                
                TextInputFormat informat = new TextInputFormat();
@@ -123,18 +122,11 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                InputSplit[] splits = informat.getSplits(job, numThreads);
                
                //compute number of columns
-               RecordReader<LongWritable, Text> reader = 
informat.getRecordReader(splits[0], job, Reporter.NULL);
-               LongWritable key = new LongWritable();
-               Text value = new Text();
-               reader.next(key, value);
-               int ncol = StringUtils.countMatches(value.toString(), 
_props.getDelim()) + 1;
-               reader.close();
+               int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, 
job, _props.getDelim());
                
                //compute number of rows
-               ExecutorService pool = Executors.newFixedThreadPool(numThreads);
-               
-               //compute num rows per split
                int nrow = 0;
+               ExecutorService pool = Executors.newFixedThreadPool(numThreads);
                try {
                        ArrayList<CountRowsTask> tasks = new 
ArrayList<CountRowsTask>();
                        for( int i=0; i<splits.length; i++ )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 48787b1..1b80f90 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -31,9 +31,14 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -182,4 +187,39 @@ public class IOUtilFunctions
                }               
                return splits;
        }
+       
+       /**
+        * Counts the number of columns in a given collection of csv file 
splits. This primitive aborts 
+        * if a row with more than 0 columns is found and hence is robust 
against empty file splits etc.
+        * 
+        * @param splits
+        * @param informat
+        * @param job
+        * @param delim
+        * @return
+        * @throws IOException 
+        */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       public static int countNumColumnsCSV(InputSplit[] splits, InputFormat 
informat, JobConf job, String delim ) 
+               throws IOException 
+       {
+               LongWritable key = new LongWritable();
+               Text value = new Text();
+               int ncol = -1;
+               for( int i=0; i<splits.length && ncol<=0; i++ ) {
+                       RecordReader<LongWritable, Text> reader = 
+                                       informat.getRecordReader(splits[i], 
job, Reporter.NULL);
+                       try {
+                               if( reader.next(key, value) ) {
+                                       String row = value.toString().trim();
+                                       if( !row.isEmpty() )
+                                               ncol = 
StringUtils.countMatches(row, delim) + 1;
+                               }
+                       }
+                       finally {
+                               closeSilently(reader);  
+                       }
+               }
+               return ncol;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java
index 946bb3d..09f497e 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java
@@ -122,8 +122,6 @@ public class TransformFrameEncodeDecodeTest extends 
AutomatedTestBase
                        OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true;
                        runTest(true, false, null, -1); 
                        
-                       //System.exit(1);
-                       
                        //read input/output and compare
                        FrameReader reader1 = 
FrameReaderFactory.createFrameReader(InputInfo.CSVInputInfo, 
                                        new CSVFileFormatProperties(true, ",", 
false));

Reply via email to