Repository: incubator-systemml Updated Branches: refs/heads/master eebb9665a -> b75823310
[SYSTEMML-573] Fix csv frame readers (schema, csv properties, unknowns) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b5842fbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b5842fbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b5842fbe Branch: refs/heads/master Commit: b5842fbe04edadacf3165acd8e721c946c5eb73f Parents: eebb966 Author: Matthias Boehm <[email protected]> Authored: Mon Jun 13 16:48:40 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Mon Jun 13 16:48:40 2016 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/FrameObject.java | 9 +++++++-- .../sysml/runtime/io/FrameReaderTextCSV.java | 17 ++++++++++------- .../runtime/io/FrameReaderTextCSVParallel.java | 8 ++------ .../sysml/runtime/matrix/data/FrameBlock.java | 8 ++++++++ 4 files changed, 27 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java index b850301..33436b6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java @@ -152,10 +152,15 @@ public class FrameObject extends CacheableData<FrameBlock> MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData; MatrixCharacteristics mc = iimd.getMatrixCharacteristics(); + //handle missing schema if necessary + List<ValueType> lschema = (_schema!=null) ? _schema : + Collections.nCopies(clen>=1 ? (int)clen : 1, ValueType.STRING); + + //read the frame block FrameBlock data = null; try { - FrameReader reader = FrameReaderFactory.createFrameReader(iimd.getInputInfo()); - data = reader.readFrameFromHDFS(fname, _schema, mc.getRows(), mc.getCols()); + FrameReader reader = FrameReaderFactory.createFrameReader(iimd.getInputInfo(), getFileFormatProperties()); + data = reader.readFrameFromHDFS(fname, lschema, mc.getRows(), mc.getCols()); } catch( DMLRuntimeException ex ) { throw new IOException(ex); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index d282aef..05796fd 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang.StringUtils; @@ -74,6 +75,7 @@ public class FrameReaderTextCSV extends FrameReader JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); FileSystem fs = FileSystem.get(job); Path path = new Path( fname ); + FileInputFormat.addInputPath(job, path); //check existence and non-empty file checkValidInputFile(fs, path); @@ -113,13 +115,12 @@ public class FrameReaderTextCSV extends FrameReader FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen) throws IOException { - FileInputFormat.addInputPath(job, path); TextInputFormat informat = new TextInputFormat(); informat.configure(job); InputSplit[] splits = informat.getSplits(job, 1); splits = IOUtilFunctions.sortInputSplits(splits); for( int i=0; i<splits.length; i++ ) - readCSVFrameFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, 0, i==0); + readCSVFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, 0, i==0); } @@ -138,7 +139,7 @@ public class FrameReaderTextCSV extends FrameReader * @return * @throws IOException */ - protected final void readCSVFrameFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job, + protected final void readCSVFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long clen, int rl, boolean first) throws IOException { @@ -156,8 +157,11 @@ public class FrameReaderTextCSV extends FrameReader int col = -1; //handle header if existing - if(first && hasHeader ) - reader.next(key, value); //ignore header + if(first && hasHeader ) { + reader.next(key, value); //read header + List<String> colnames = Arrays.asList(value.toString().split(delim)); + dest.setColumnNames(colnames); + } // Read the data boolean emptyValuesFound = false; @@ -207,8 +211,7 @@ public class FrameReaderTextCSV extends FrameReader */ protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, FileSystem fs) throws IOException - { - FileInputFormat.addInputPath(job, path); + { TextInputFormat informat = new TextInputFormat(); informat.configure(job); InputSplit[] splits = informat.getSplits(job, 1); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java index b998fce..cb7a05b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; @@ -74,10 +73,9 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV { int numThreads = OptimizerUtils.getParallelTextReadParallelism(); - FileInputFormat.addInputPath(job, path); TextInputFormat informat = new TextInputFormat(); informat.configure(job); - InputSplit[] splits = informat.getSplits(job, numThreads); + InputSplit[] splits = informat.getSplits(job, numThreads); splits = IOUtilFunctions.sortInputSplits(splits); try @@ -120,7 +118,6 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV { int numThreads = OptimizerUtils.getParallelTextReadParallelism(); - FileInputFormat.addInputPath(job, path); TextInputFormat informat = new TextInputFormat(); informat.configure(job); InputSplit[] splits = informat.getSplits(job, numThreads); @@ -177,7 +174,6 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV public Long call() throws Exception { - RecordReader<LongWritable, Text> reader = _informat.getRecordReader(_split, _job, Reporter.NULL); LongWritable key = new LongWritable(); Text value = new Text(); @@ -227,7 +223,7 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV public Object call() throws Exception { - readCSVFrameFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(), + readCSVFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(), _dest.getColumnNames(), _dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit); return null; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 53d2001..4124c03 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -149,6 +149,14 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable } /** + * + * @param colnames + */ + public void setColumnNames(List<String> colnames) { + _colnames = colnames; + } + + /** * Creates a mapping from column names to column IDs, i.e., * 1-based column indexes *
