[SYSTEMML-630] Fix robustness csv frame readers (count num columns) In corner cases, our distributed csv write produces empty partitions and hence empty csv file splits. Depending on the ordering of splits, this causes problems when determining the number of columns on csv read with a single row only. This patch introduces a more robust approach for computing the number of columns on csv read, where we read split headers until we find a row with ncol>0.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/34a2e1d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/34a2e1d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/34a2e1d8 Branch: refs/heads/master Commit: 34a2e1d8a6f7c7247dd8f8e505b4c2054f4b2589 Parents: 8c4f83a Author: Matthias Boehm <[email protected]> Authored: Fri Jun 24 23:08:52 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Fri Jun 24 23:08:52 2016 -0700 ---------------------------------------------------------------------- .../sysml/runtime/io/FrameReaderTextCSV.java | 21 ++++------ .../runtime/io/FrameReaderTextCSVParallel.java | 16 ++------ .../sysml/runtime/io/IOUtilFunctions.java | 40 ++++++++++++++++++++ .../TransformFrameEncodeDecodeTest.java | 2 - 4 files changed, 52 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index 05796fd..b5a5756 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -217,26 +216,22 @@ public class FrameReaderTextCSV extends FrameReader InputSplit[] splits = informat.getSplits(job, 1); splits = IOUtilFunctions.sortInputSplits(splits); - boolean first = true; - int ncol = -1; - int nrow = -1; + //compute number of columns + int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim()); - for( InputSplit split : splits ) + //compute number of rows + int nrow = -1; + for( int i=0; i<splits.length; i++ ) { - RecordReader<LongWritable, Text> reader = informat.getRecordReader(split, job, Reporter.NULL); + RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[i], job, Reporter.NULL); LongWritable key = new LongWritable(); Text value = new Text(); try { - //read head and first line to determine num columns - if( first ) { - if ( _props.hasHeader() ) - reader.next(key, value); //ignore header + //ignore header of first split + if( i==0 && _props.hasHeader() ) reader.next(key, value); - ncol = StringUtils.countMatches(value.toString(), _props.getDelim()) + 1; - nrow = 1; first = false; - } //count remaining number of rows while ( reader.next(key, value) ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java index cb7a05b..da71905 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -96,7 +95,7 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV offset += count.get(); } - //read individial splits + //read individual splits ArrayList<ReadRowsTask> tasks2 = new ArrayList<ReadRowsTask>(); for( int i=0; i<splits.length; i++ ) tasks2.add( new ReadRowsTask(splits[i], informat, job, dest, offsets.get(i).intValue(), i==0)); @@ -115,7 +114,7 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV @Override protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) throws IOException - { + { int numThreads = OptimizerUtils.getParallelTextReadParallelism(); TextInputFormat informat = new TextInputFormat(); @@ -123,18 +122,11 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV InputSplit[] splits = informat.getSplits(job, numThreads); //compute number of columns - RecordReader<LongWritable, Text> reader = informat.getRecordReader(splits[0], job, Reporter.NULL); - LongWritable key = new LongWritable(); - Text value = new Text(); - reader.next(key, value); - int ncol = StringUtils.countMatches(value.toString(), _props.getDelim()) + 1; - reader.close(); + int ncol = IOUtilFunctions.countNumColumnsCSV(splits, informat, job, _props.getDelim()); //compute number of rows - ExecutorService pool = Executors.newFixedThreadPool(numThreads); - - //compute num rows per split int nrow = 0; + ExecutorService pool = Executors.newFixedThreadPool(numThreads); try { ArrayList<CountRowsTask> tasks = new ArrayList<CountRowsTask>(); for( int i=0; i<splits.length; i++ ) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 48787b1..1b80f90 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -31,9 +31,14 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -182,4 +187,39 @@ public class IOUtilFunctions } return splits; } + + /** + * Counts the number of columns in a given collection of csv file splits. This primitive aborts + * if a row with more than 0 columns is found and hence is robust against empty file splits etc. + * + * @param splits + * @param informat + * @param job + * @param delim + * @return + * @throws IOException + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static int countNumColumnsCSV(InputSplit[] splits, InputFormat informat, JobConf job, String delim ) + throws IOException + { + LongWritable key = new LongWritable(); + Text value = new Text(); + int ncol = -1; + for( int i=0; i<splits.length && ncol<=0; i++ ) { + RecordReader<LongWritable, Text> reader = + informat.getRecordReader(splits[i], job, Reporter.NULL); + try { + if( reader.next(key, value) ) { + String row = value.toString().trim(); + if( !row.isEmpty() ) + ncol = StringUtils.countMatches(row, delim) + 1; + } + } + finally { + closeSilently(reader); + } + } + return ncol; + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/34a2e1d8/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java index 946bb3d..09f497e 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/transform/TransformFrameEncodeDecodeTest.java @@ -122,8 +122,6 @@ public class TransformFrameEncodeDecodeTest extends AutomatedTestBase OptimizerUtils.ALLOW_FRAME_CSV_REBLOCK = true; runTest(true, false, null, -1); - //System.exit(1); - //read input/output and compare FrameReader reader1 = FrameReaderFactory.createFrameReader(InputInfo.CSVInputInfo, new CSVFileFormatProperties(true, ",", false));
