Repository: incubator-systemml
Updated Branches:
  refs/heads/master eebb9665a -> b75823310


[SYSTEMML-573] Fix csv frame readers (schema, csv properties, unknowns)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b5842fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b5842fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b5842fbe

Branch: refs/heads/master
Commit: b5842fbe04edadacf3165acd8e721c946c5eb73f
Parents: eebb966
Author: Matthias Boehm <[email protected]>
Authored: Mon Jun 13 16:48:40 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Mon Jun 13 16:48:40 2016 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/FrameObject.java        |  9 +++++++--
 .../sysml/runtime/io/FrameReaderTextCSV.java       | 17 ++++++++++-------
 .../runtime/io/FrameReaderTextCSVParallel.java     |  8 ++------
 .../sysml/runtime/matrix/data/FrameBlock.java      |  8 ++++++++
 4 files changed, 27 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
index b850301..33436b6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/FrameObject.java
@@ -152,10 +152,15 @@ public class FrameObject extends CacheableData<FrameBlock>
                MatrixFormatMetaData iimd = (MatrixFormatMetaData) _metaData;
                MatrixCharacteristics mc = iimd.getMatrixCharacteristics();
                
+               //handle missing schema if necessary
+               List<ValueType> lschema = (_schema!=null) ? _schema : 
+                       Collections.nCopies(clen>=1 ? (int)clen : 1, 
ValueType.STRING);
+               
+               //read the frame block
                FrameBlock data = null;
                try {
-                       FrameReader reader = 
FrameReaderFactory.createFrameReader(iimd.getInputInfo());
-                       data = reader.readFrameFromHDFS(fname, _schema, 
mc.getRows(), mc.getCols()); 
+                       FrameReader reader = 
FrameReaderFactory.createFrameReader(iimd.getInputInfo(), 
getFileFormatProperties());
+                       data = reader.readFrameFromHDFS(fname, lschema, 
mc.getRows(), mc.getCols()); 
                }
                catch( DMLRuntimeException ex ) {
                        throw new IOException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index d282aef..05796fd 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
@@ -74,6 +75,7 @@ public class FrameReaderTextCSV extends FrameReader
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                FileSystem fs = FileSystem.get(job);
                Path path = new Path( fname );
+               FileInputFormat.addInputPath(job, path);
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
@@ -113,13 +115,12 @@ public class FrameReaderTextCSV extends FrameReader
                        FrameBlock dest, List<ValueType> schema, List<String> 
names, long rlen, long clen) 
                throws IOException
        {
-               FileInputFormat.addInputPath(job, path);
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
                InputSplit[] splits = informat.getSplits(job, 1);
                splits = IOUtilFunctions.sortInputSplits(splits);
                for( int i=0; i<splits.length; i++ )
-                       readCSVFrameFrameFromInputSplit(splits[i], informat, 
job, dest, schema, names, rlen, clen, 0, i==0);
+                       readCSVFrameFromInputSplit(splits[i], informat, job, 
dest, schema, names, rlen, clen, 0, i==0);
        }
        
        
@@ -138,7 +139,7 @@ public class FrameReaderTextCSV extends FrameReader
         * @return
         * @throws IOException
         */
-       protected final void readCSVFrameFrameFromInputSplit( InputSplit split, 
TextInputFormat informat, JobConf job, 
+       protected final void readCSVFrameFromInputSplit( InputSplit split, 
TextInputFormat informat, JobConf job, 
                        FrameBlock dest, List<ValueType> schema, List<String> 
names, long rlen, long clen, int rl, boolean first)
                throws IOException
        {
@@ -156,8 +157,11 @@ public class FrameReaderTextCSV extends FrameReader
                int col = -1;
                
                //handle header if existing
-               if(first && hasHeader ) 
-                       reader.next(key, value); //ignore header
+               if(first && hasHeader ) {
+                       reader.next(key, value); //read header
+                       List<String> colnames = 
Arrays.asList(value.toString().split(delim));
+                       dest.setColumnNames(colnames);
+               }
                        
                // Read the data
                boolean emptyValuesFound = false;
@@ -207,8 +211,7 @@ public class FrameReaderTextCSV extends FrameReader
         */
        protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, 
FileSystem fs) 
                throws IOException 
-       {               
-               FileInputFormat.addInputPath(job, path);
+       {       
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
                InputSplit[] splits = informat.getSplits(job, 1);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
index b998fce..cb7a05b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -74,10 +73,9 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
        {
                int numThreads = 
OptimizerUtils.getParallelTextReadParallelism();
                
-               FileInputFormat.addInputPath(job, path);
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
-               InputSplit[] splits = informat.getSplits(job, numThreads);
+               InputSplit[] splits = informat.getSplits(job, numThreads); 
                splits = IOUtilFunctions.sortInputSplits(splits);
 
                try 
@@ -120,7 +118,6 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
        {               
                int numThreads = 
OptimizerUtils.getParallelTextReadParallelism();
                
-               FileInputFormat.addInputPath(job, path);
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
                InputSplit[] splits = informat.getSplits(job, numThreads);
@@ -177,7 +174,6 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                public Long call() 
                        throws Exception 
                {
-                       
                        RecordReader<LongWritable, Text> reader = 
_informat.getRecordReader(_split, _job, Reporter.NULL);
                        LongWritable key = new LongWritable();
                        Text value = new Text();
@@ -227,7 +223,7 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                public Object call() 
                        throws Exception 
                {
-                       readCSVFrameFrameFromInputSplit(_split, _informat, 
_job, _dest, _dest.getSchema(), 
+                       readCSVFrameFromInputSplit(_split, _informat, _job, 
_dest, _dest.getSchema(), 
                                        _dest.getColumnNames(), 
_dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit);
                        return null;
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b5842fbe/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 53d2001..4124c03 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -149,6 +149,14 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        }
        
        /**
+        * 
+        * @param colnames
+        */
+       public void setColumnNames(List<String> colnames) {
+               _colnames = colnames;
+       }
+       
+       /**
         * Creates a mapping from column names to column IDs, i.e., 
         * 1-based column indexes
         * 

Reply via email to