[SYSTEMML-630] New parallel csv/textcell frame readers, cleanup/tests

This patch introduces parallel frame readers for csv and textcell. On a
1Mx1k (8GB binary,~25-30GB in csv/text) scenario, multi-threaded read
led to the following performance improvements:

* Textcell: 930s (single-threaded) -> 83s (multi-threaded)
* CSV: 538s (single-threaded) -> 51s (multi-threaded) 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/dea42de1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/dea42de1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/dea42de1

Branch: refs/heads/master
Commit: dea42de1ff7e8ada41ad50e4e941f080f82d2346
Parents: 288438b
Author: Matthias Boehm <[email protected]>
Authored: Wed Jun 8 00:08:27 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Jun 8 01:24:02 2016 -0700

----------------------------------------------------------------------
 .../io/FrameReaderBinaryBlockParallel.java      |  11 +-
 .../sysml/runtime/io/FrameReaderFactory.java    |  10 +-
 .../sysml/runtime/io/FrameReaderTextCSV.java    | 217 ++++++++++-------
 .../runtime/io/FrameReaderTextCSVParallel.java  | 235 +++++++++++++++++++
 .../sysml/runtime/io/FrameReaderTextCell.java   | 132 ++++++-----
 .../runtime/io/FrameReaderTextCellParallel.java | 114 +++++++++
 .../sysml/runtime/io/IOUtilFunctions.java       |  28 +++
 .../sysml/runtime/io/ReaderTextCSVParallel.java |  19 +-
 .../functions/frame/FrameReadWriteTest.java     |  89 +++++--
 9 files changed, 662 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
index d684af0..3571c37 100644
--- 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlockParallel.java
@@ -63,16 +63,14 @@ public class FrameReaderBinaryBlockParallel extends 
FrameReaderBinaryBlock
                        //create read tasks for all files
                        ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
                        ArrayList<ReadFileTask> tasks = new 
ArrayList<ReadFileTask>();
-                       for( Path lpath : getSequenceFilePaths(fs, path) ){
-                               ReadFileTask t = new ReadFileTask(lpath, job, 
fs, dest);
-                               tasks.add(t);
-                       }
+                       for( Path lpath : getSequenceFilePaths(fs, path) )
+                               tasks.add(new ReadFileTask(lpath, job, fs, 
dest));
 
                        //wait until all tasks have been executed
                        List<Future<Object>> rt = pool.invokeAll(tasks);        
                        pool.shutdown();
                        
-                       //check for exceptions and aggregate nnz
+                       //check for exceptions
                        for( Future<Object> task : rt )
                                task.get();
                } 
@@ -99,8 +97,7 @@ public class FrameReaderBinaryBlockParallel extends 
FrameReaderBinaryBlock
                }
 
                @Override
-               public Object call() throws Exception 
-               {
+               public Object call() throws Exception {
                        readBinaryBlockFrameFromSequenceFile(_path, _job, _fs, 
_dest);
                        return null;
                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
index 51606df..b768450 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderFactory.java
@@ -80,12 +80,18 @@ public class FrameReaderFactory
                FrameReader reader = null;
 
                if( iinfo == InputInfo.TextCellInputInfo ) {
-                       reader = new FrameReaderTextCell();
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS)
 )
+                               reader = new FrameReaderTextCellParallel();
+                       else    
+                               reader = new FrameReaderTextCell();
                }
                else if( iinfo == InputInfo.CSVInputInfo ) {
                        if( props!=null && !(props instanceof 
CSVFileFormatProperties) )
                                throw new DMLRuntimeException("Wrong type of 
file format properties for CSV writer.");
-                       reader = new FrameReaderTextCSV( 
(CSVFileFormatProperties)props);
+                       if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS)
 )
+                               reader = new FrameReaderTextCSVParallel( 
(CSVFileFormatProperties)props );
+                       else
+                               reader = new FrameReaderTextCSV( 
(CSVFileFormatProperties)props );
                }
                else if( iinfo == InputInfo.BinaryBlockInputInfo ) {
                        if( 
ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_READ_BINARYFORMATS)
 )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index 2086ea3..d282aef 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -19,32 +19,37 @@
 
 package org.apache.sysml.runtime.io;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.matrix.CSVReblockMR;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+/**
+ * Single-threaded frame text csv reader.
+ * 
+ */
 public class FrameReaderTextCSV extends FrameReader
 {
-
-       private CSVFileFormatProperties _props = null;
+       protected CSVFileFormatProperties _props = null;
        
-       public FrameReaderTextCSV(CSVFileFormatProperties props)
-       {
+       public FrameReaderTextCSV(CSVFileFormatProperties props) {
                _props = props;
        }
        
@@ -61,15 +66,10 @@ public class FrameReaderTextCSV extends FrameReader
         * @throws IOException 
         */
        @Override
-       public FrameBlock readFrameFromHDFS(String fname, List<ValueType> 
schema, List<String> names,
+       public final FrameBlock readFrameFromHDFS(String fname, List<ValueType> 
schema, List<String> names,
                        long rlen, long clen)
                throws IOException, DMLRuntimeException 
        {
-               //allocate output frame block
-               FrameBlock ret = null;
-               if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file 
size for frame w/ unknown dimensions
-                       ret = createOutputFrameBlock(schema, names, rlen);
-               
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
                FileSystem fs = FileSystem.get(job);
@@ -77,13 +77,52 @@ public class FrameReaderTextCSV extends FrameReader
                
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
+               
+               //compute size if necessary
+               if( rlen <= 0 || clen <= 0 ) {
+                       Pair<Integer,Integer> size = computeCSVSize(path, job, 
fs);
+                       rlen = size.getKey();
+                       clen = size.getValue();
+               }
+               
+               //allocate output frame block
+               List<ValueType> lschema = createOutputSchema(schema, clen);
+               List<String> lnames = createOutputNames(names, clen);
+               FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
        
-               //core read 
-               ret = readCSVFrameFromHDFS(path, job, fs, ret, schema, names, 
rlen, clen,  
-                                  _props.hasHeader(), _props.getDelim(), 
_props.isFill() );
+               //core read (sequential/parallel) 
+               readCSVFrameFromHDFS(path, job, fs, ret, lschema, lnames, rlen, 
clen);
                
                return ret;
        }
+
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs
+        * @param dest
+        * @param schema
+        * @param names
+        * @param rlen
+        * @param clen
+        * @return
+        * @throws IOException 
+        */
+       protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem 
fs, 
+                       FrameBlock dest, List<ValueType> schema, List<String> 
names, long rlen, long clen) 
+               throws IOException
+       {
+               FileInputFormat.addInputPath(job, path);
+               TextInputFormat informat = new TextInputFormat();
+               informat.configure(job);
+               InputSplit[] splits = informat.getSplits(job, 1);
+               splits = IOUtilFunctions.sortInputSplits(splits);
+               for( int i=0; i<splits.length; i++ )
+                       readCSVFrameFrameFromInputSplit(splits[i], informat, 
job, dest, schema, names, rlen, clen, 0, i==0);
+       }
+       
+       
        
        /**
         * 
@@ -99,72 +138,60 @@ public class FrameReaderTextCSV extends FrameReader
         * @return
         * @throws IOException
         */
-       @SuppressWarnings("unchecked")
-       private FrameBlock readCSVFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, 
-                       List<ValueType> schema, List<String> names, long rlen, 
long clen, boolean hasHeader, String delim, boolean fill)
+       protected final void readCSVFrameFrameFromInputSplit( InputSplit split, 
TextInputFormat informat, JobConf job, 
+                       FrameBlock dest, List<ValueType> schema, List<String> 
names, long rlen, long clen, int rl, boolean first)
                throws IOException
        {
-               ArrayList<Path> files=new ArrayList<Path>();
-               if(fs.isDirectory(path)) {
-                       for(FileStatus stat: fs.listStatus(path, 
CSVReblockMR.hiddenFileFilter))
-                               files.add(stat.getPath());
-                       Collections.sort(files);
-               }
-               else
-                       files.add(path);
+               boolean hasHeader = _props.hasHeader();
+               boolean isFill = _props.isFill();
+               double dfillValue = _props.getFillValue();
+               String sfillValue = String.valueOf(_props.getFillValue());
+               String delim = _props.getDelim();
                
-               if ( dest == null ) {
-                       dest = computeCSVSize(files, fs, schema, names, 
hasHeader, delim);
-                       clen = dest.getNumColumns();
-               }
-               
-               /////////////////////////////////////////
-               String value = null;
-               int row = 0;
+               //create record reader
+               RecordReader<LongWritable, Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);
+               LongWritable key = new LongWritable();
+               Text value = new Text();
+               int row = rl;
                int col = -1;
                
-               for(int fileNo=0; fileNo<files.size(); fileNo++)
-               {
-                       BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo))));
-                       if(fileNo==0 && hasHeader ) 
-                               br.readLine(); //ignore header
+               //handle header if existing
+               if(first && hasHeader ) 
+                       reader.next(key, value); //ignore header
                        
-                       // Read the data
-                       boolean emptyValuesFound = false;
-                       try
+               // Read the data
+               boolean emptyValuesFound = false;
+               try
+               {
+                       while( reader.next(key, value) ) //foreach line
                        {
-                               while( (value=br.readLine())!=null ) //foreach 
line
+                               String cellStr = value.toString().trim();
+                               emptyValuesFound = false; col = 0;
+                               String[] parts = IOUtilFunctions.split(cellStr, 
delim);
+                               
+                               for( String part : parts ) //foreach cell
                                {
-                                       String cellStr = 
value.toString().trim();
-                                       emptyValuesFound = false;
-                                       String[] parts = 
IOUtilFunctions.split(cellStr, delim);
-                                       col = 0;
-                                       
-                                       for( String part : parts ) //foreach 
cell
-                                       {
-                                               part = part.trim();
-                                               if ( part.isEmpty() ) {
-                                                       //TODO: Do we need to 
handle empty cell condition?
-                                                       emptyValuesFound = true;
-                                               }
-                                               else {
-                                                       dest.set(row, col, 
UtilFunctions.stringToObject(schema.get(col), part));
-                                               }
-                                               col++;
+                                       part = part.trim();
+                                       if ( part.isEmpty() ) {
+                                               if( isFill && dfillValue!=0 )
+                                                       dest.set(row, col, 
UtilFunctions.stringToObject(schema.get(col), sfillValue));
+                                               emptyValuesFound = true;
                                        }
-                                       
-                                       //sanity checks for empty values and 
number of columns
-                                       
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, 
emptyValuesFound);
-                                       
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, 
parts, clen);
-                                       row++;
+                                       else {
+                                               dest.set(row, col, 
UtilFunctions.stringToObject(schema.get(col), part));
+                                       }
+                                       col++;
                                }
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(br);
+                               
+                               //sanity checks for empty values and number of 
columns
+                               
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, isFill, 
emptyValuesFound);
+                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns("", cellStr, parts, clen);
+                               row++;
                        }
                }
-               
-               return dest;
+               finally {
+                       IOUtilFunctions.closeSilently(reader);
+               }
        }
        
        /**
@@ -178,35 +205,45 @@ public class FrameReaderTextCSV extends FrameReader
         * @return
         * @throws IOException
         */
-       private FrameBlock computeCSVSize ( List<Path> files, FileSystem fs, 
List<ValueType> schema, List<String> names, boolean hasHeader, String delim) 
+       protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, 
FileSystem fs) 
                throws IOException 
        {               
-               int nrow = 0;
-               for(int fileNo=0; fileNo<files.size(); fileNo++)
+               FileInputFormat.addInputPath(job, path);
+               TextInputFormat informat = new TextInputFormat();
+               informat.configure(job);
+               InputSplit[] splits = informat.getSplits(job, 1);
+               splits = IOUtilFunctions.sortInputSplits(splits);
+               
+               boolean first = true;
+               int ncol = -1;
+               int nrow = -1;
+               
+               for( InputSplit split : splits ) 
                {
-                       BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo))));      
+                       RecordReader<LongWritable, Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);
+                       LongWritable key = new LongWritable();
+                       Text value = new Text();
+                       
                        try
                        {
-                               // Read the header line, if there is one.
-                               if(fileNo==0)
-                               {
-                                       if ( hasHeader ) 
-                                               br.readLine(); //ignore header
+                               //read head and first line to determine num 
columns
+                               if( first ) {
+                                       if ( _props.hasHeader() ) 
+                                               reader.next(key, value); 
//ignore header
+                                       reader.next(key, value);
+                                       ncol = 
StringUtils.countMatches(value.toString(), _props.getDelim()) + 1;
+                                       nrow = 1; first = false;
                                }
                                
-                               while ( br.readLine() != null ) {
+                               //count remaining number of rows
+                               while ( reader.next(key, value) )
                                        nrow++;
-                               }
                        }
                        finally {
-                               IOUtilFunctions.closeSilently(br);
+                               IOUtilFunctions.closeSilently(reader);
                        }
                }
                
-               //create new frame block
-               FrameBlock frameBlock = new FrameBlock(schema, names);
-               frameBlock.ensureAllocatedColumns(nrow);
-               return frameBlock;
+               return new Pair<Integer,Integer>(nrow, ncol);
        }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
new file mode 100644
index 0000000..b998fce
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.Pair;
+
+/**
+ * Multi-threaded frame text csv reader.
+ * 
+ */
+public class FrameReaderTextCSVParallel extends FrameReaderTextCSV
+{
+       public FrameReaderTextCSVParallel(CSVFileFormatProperties props) {
+               super(props);
+       }
+
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs
+        * @param dest
+        * @param schema
+        * @param names
+        * @param rlen
+        * @param clen
+        * @return
+        * @throws IOException 
+        */
+       @Override
+       protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem 
fs, 
+                       FrameBlock dest, List<ValueType> schema, List<String> 
names, long rlen, long clen) 
+               throws IOException
+       {
+               int numThreads = 
OptimizerUtils.getParallelTextReadParallelism();
+               
+               FileInputFormat.addInputPath(job, path);
+               TextInputFormat informat = new TextInputFormat();
+               informat.configure(job);
+               InputSplit[] splits = informat.getSplits(job, numThreads);
+               splits = IOUtilFunctions.sortInputSplits(splits);
+
+               try 
+               {
+                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       
+                       //compute num rows per split
+                       ArrayList<CountRowsTask> tasks = new 
ArrayList<CountRowsTask>();
+                       for( int i=0; i<splits.length; i++ )
+                               tasks.add(new CountRowsTask(splits[i], 
informat, job, _props.hasHeader(), i==0));
+                       List<Future<Long>> cret = pool.invokeAll(tasks);
+
+                       //compute row offset per split via cumsum on row counts
+                       long offset = 0;
+                       List<Long> offsets = new ArrayList<Long>();
+                       for( Future<Long> count : cret ) {
+                               offsets.add(offset);
+                               offset += count.get();
+                       }
+                       
+                       //read individial splits
+                       ArrayList<ReadRowsTask> tasks2 = new 
ArrayList<ReadRowsTask>();
+                       for( int i=0; i<splits.length; i++ )
+                               tasks2.add( new ReadRowsTask(splits[i], 
informat, job, dest, offsets.get(i).intValue(), i==0));
+                       List<Future<Object>> rret = pool.invokeAll(tasks2);
+                       pool.shutdown();
+                       
+                       //error handling
+                       for( Future<Object> read : rret )
+                               read.get();
+               } 
+               catch (Exception e) {
+                       throw new IOException("Failed parallel read of text csv 
input.", e);
+               }
+       }
+
+       @Override
+       protected Pair<Integer,Integer> computeCSVSize( Path path, JobConf job, 
FileSystem fs) 
+               throws IOException 
+       {               
+               int numThreads = 
OptimizerUtils.getParallelTextReadParallelism();
+               
+               FileInputFormat.addInputPath(job, path);
+               TextInputFormat informat = new TextInputFormat();
+               informat.configure(job);
+               InputSplit[] splits = informat.getSplits(job, numThreads);
+               
+               //compute number of columns
+               RecordReader<LongWritable, Text> reader = 
informat.getRecordReader(splits[0], job, Reporter.NULL);
+               LongWritable key = new LongWritable();
+               Text value = new Text();
+               reader.next(key, value);
+               int ncol = StringUtils.countMatches(value.toString(), 
_props.getDelim()) + 1;
+               reader.close();
+               
+               //compute number of rows
+               ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+               
+               //compute num rows per split
+               int nrow = 0;
+               try {
+                       ArrayList<CountRowsTask> tasks = new 
ArrayList<CountRowsTask>();
+                       for( int i=0; i<splits.length; i++ )
+                               tasks.add(new CountRowsTask(splits[i], 
informat, job, _props.hasHeader(), i==0));
+                       List<Future<Long>> cret = pool.invokeAll(tasks);
+                       for( Future<Long> count : cret ) 
+                               nrow += count.get().intValue();
+               }
+               catch (Exception e) {
+                       throw new IOException("Failed parallel read of text csv 
input.", e);
+               }
+               
+               return new Pair<Integer,Integer>(nrow, ncol);
+       }
+       
+       /**
+        * 
+        * 
+        */
+       private static class CountRowsTask implements Callable<Long> 
+       {
+               private InputSplit _split = null;
+               private TextInputFormat _informat = null;
+               private JobConf _job = null;
+               private boolean _hasHeader = false;
+               private boolean _firstSplit = false;
+
+               public CountRowsTask(InputSplit split, TextInputFormat 
informat, JobConf job, boolean hasHeader, boolean first) {
+                       _split = split;
+                       _informat = informat;
+                       _job = job;
+                       _hasHeader = hasHeader;
+                       _firstSplit = first;
+               }
+
+               @Override
+               public Long call() 
+                       throws Exception 
+               {
+                       
+                       RecordReader<LongWritable, Text> reader = 
_informat.getRecordReader(_split, _job, Reporter.NULL);
+                       LongWritable key = new LongWritable();
+                       Text value = new Text();
+                       long nrows = 0;
+                       
+                       // count rows from the first non-header row
+                       try {
+                               if ( _firstSplit && _hasHeader )
+                                       reader.next(key, value);
+                               while (reader.next(key, value))
+                                       nrows++;
+                       } 
+                       finally {
+                               IOUtilFunctions.closeSilently(reader);
+                       }
+
+                       return nrows;
+               }
+       }
+
+       /**
+        * 
+        * 
+        */
+       private class ReadRowsTask implements Callable<Object> 
+       {
+               private InputSplit _split = null;
+               private TextInputFormat _informat = null;
+               private JobConf _job = null;
+               private FrameBlock _dest = null;
+               private int _offset = -1;
+               private boolean _isFirstSplit = false;
+               
+               
+               public ReadRowsTask(InputSplit split, TextInputFormat informat, 
JobConf job, 
+                               FrameBlock dest, int offset, boolean first) 
+               {
+                       _split = split;
+                       _informat = informat;
+                       _job = job;
+                       _dest = dest;
+                       _offset = offset;
+                       _isFirstSplit = first;
+               }
+
+               @Override
+               public Object call() 
+                       throws Exception 
+               {
+                       readCSVFrameFrameFromInputSplit(_split, _informat, 
_job, _dest, _dest.getSchema(), 
+                                       _dest.getColumnNames(), 
_dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit);
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index 2f42a64..8345b47 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -42,9 +42,12 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+/**
+ * Single-threaded frame textcell reader.
+ * 
+ */
 public class FrameReaderTextCell extends FrameReader
 {
-
        /**
         * 
         * @param fname
@@ -57,12 +60,13 @@ public class FrameReaderTextCell extends FrameReader
         * @throws IOException 
         */
        @Override
-       public FrameBlock readFrameFromHDFS(String fname, List<ValueType> 
schema, List<String> names,
-                       long rlen, long clen)
-                       throws IOException, DMLRuntimeException
+       public final FrameBlock readFrameFromHDFS(String fname, List<ValueType> 
schema, List<String> names, long rlen, long clen)
+               throws IOException, DMLRuntimeException
        {
                //allocate output frame block
-               FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+               List<ValueType> lschema = createOutputSchema(schema, clen);
+               List<String> lnames = createOutputNames(names, clen);
+               FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
                
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());     
@@ -72,11 +76,8 @@ public class FrameReaderTextCell extends FrameReader
                //check existence and non-empty file
                checkValidInputFile(fs, path); 
        
-               //core read 
-               if( fs.isDirectory(path) )
-                       readTextCellFrameFromHDFS(path, job, ret, schema, 
names, rlen, clen);
-               else
-                       readRawTextCellFrameFromHDFS(path, job, fs, ret, 
schema, names, rlen, clen);
+               //core read (sequential/parallel)
+               readTextCellFrameFromHDFS(path, job, fs, ret, lschema, lnames, 
rlen, clen);
                
                return ret;
        }
@@ -90,7 +91,7 @@ public class FrameReaderTextCell extends FrameReader
         * @throws IOException
         * @throws DMLRuntimeException
         */
-       public FrameBlock readFrameFromInputStream(InputStream is, long rlen, 
long clen) 
+       public final FrameBlock readFrameFromInputStream(InputStream is, long 
rlen, long clen) 
                throws IOException, DMLRuntimeException {
                return readFrameFromInputStream(is, getDefSchema(clen), 
getDefColNames(clen), rlen, clen);
        }
@@ -106,83 +107,98 @@ public class FrameReaderTextCell extends FrameReader
         * @throws DMLRuntimeException 
         * @throws IOException 
         */
-       public FrameBlock readFrameFromInputStream(InputStream is, 
List<ValueType> schema, List<String> names, long rlen, long clen) 
+       public final FrameBlock readFrameFromInputStream(InputStream is, 
List<ValueType> schema, List<String> names, long rlen, long clen) 
                throws IOException, DMLRuntimeException 
        {
                //allocate output frame block
-               FrameBlock ret = createOutputFrameBlock(schema, names, rlen);
+               List<ValueType> lschema = createOutputSchema(schema, clen);
+               List<String> lnames = createOutputNames(names, clen);
+               FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
        
                //core read 
-               readRawTextCellFrameFromInputStream(is, ret, schema, names, 
rlen, clen);
+               readRawTextCellFrameFromInputStream(is, ret, lschema, lnames, 
rlen, clen);
                
                return ret;
        }
        
-
        /**
         * 
         * @param path
         * @param job
+        * @param fs
         * @param dest
         * @param schema
         * @param names
         * @param rlen
         * @param clen
-        * @return
         * @throws IOException
         */
-       private void readTextCellFrameFromHDFS( Path path, JobConf job, 
FrameBlock dest, 
+       protected void readTextCellFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, 
                        List<ValueType> schema, List<String> names, long rlen, 
long clen)
                throws IOException
        {
-               FileInputFormat.addInputPath(job, path);
-               TextInputFormat informat = new TextInputFormat();
-               informat.configure(job);
-               InputSplit[] splits = informat.getSplits(job, 1);
+               if( fs.isDirectory(path) ) {
+                       FileInputFormat.addInputPath(job, path);
+                       TextInputFormat informat = new TextInputFormat();
+                       informat.configure(job);
+                       InputSplit[] splits = informat.getSplits(job, 1);
+                       for(InputSplit split: splits)
+                               readTextCellFrameFromInputSplit(split, 
informat, job, dest);
+               }
+               else {
+                       readRawTextCellFrameFromHDFS(path, job, fs, dest, 
schema, names, rlen, clen);
+               }
+       }
+
+       /**
+        * 
+        * @param split
+        * @param dest
+        * @param schema
+        * @param names
+        * @param rlen
+        * @param clen
+        * @throws IOException
+        */
+       protected final void readTextCellFrameFromInputSplit( InputSplit split, 
TextInputFormat informat, JobConf job, FrameBlock dest)
+               throws IOException
+       {
+               List<ValueType> schema = dest.getSchema();
+               int rlen = dest.getNumRows();
+               int clen = dest.getNumColumns();
+               
+               //create record reader
+               RecordReader<LongWritable,Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);
                
                LongWritable key = new LongWritable();
                Text value = new Text();
+               FastStringTokenizer st = new FastStringTokenizer(' ');
                int row = -1;
                int col = -1;
                
                try
                {
-                       FastStringTokenizer st = new FastStringTokenizer(' ');
-                       
-                       for(InputSplit split: splits)
-                       {
-                               RecordReader<LongWritable,Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);
-                       
-                               try
-                               {
-                                       while( reader.next(key, value) )
-                                       {
-                                               st.reset( value.toString() ); 
//reinit tokenizer
-                                               row = st.nextInt()-1;
-                                               col = st.nextInt()-1;
-                                               dest.set(row, col, 
UtilFunctions.stringToObject(schema.get(col), st.nextToken()));
-                                       }
-                               }
-                               finally
-                               {
-                                       if( reader != null )
-                                               reader.close();
-                               }
+                       while( reader.next(key, value) ) {
+                               st.reset( value.toString() ); //reinit tokenizer
+                               row = st.nextInt()-1;
+                               col = st.nextInt()-1;
+                               dest.set(row, col, 
UtilFunctions.stringToObject(schema.get(col), st.nextToken()));
                        }
                }
-               catch(Exception ex)
+               catch(Exception ex) 
                {
                        //post-mortem error handling and bounds checking
-                       if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > 
clen )
-                       {
+                       if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > 
clen ) {
                                throw new IOException("Frame cell 
["+(row+1)+","+(col+1)+"] " +
                                                                          "out 
of overall frame range [1:"+rlen+",1:"+clen+"].");
                        }
-                       else
-                       {
+                       else {
                                throw new IOException( "Unable to read frame in 
text cell format.", ex );
                        }
                }
+               finally {
+                       IOUtilFunctions.closeSilently(reader);
+               }               
        }
 
        
@@ -199,7 +215,7 @@ public class FrameReaderTextCell extends FrameReader
         * @return
         * @throws IOException
         */
-       private void readRawTextCellFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, 
+       protected final void readRawTextCellFrameFromHDFS( Path path, JobConf 
job, FileSystem fs, FrameBlock dest, 
                        List<ValueType> schema, List<String> names, long rlen, 
long clen)
                throws IOException
        {
@@ -221,21 +237,20 @@ public class FrameReaderTextCell extends FrameReader
         * @return
         * @throws IOException
         */
-       private void readRawTextCellFrameFromInputStream( InputStream is, 
FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, long 
clen)
-                       throws IOException
+       protected final void readRawTextCellFrameFromInputStream( InputStream 
is, FrameBlock dest, List<ValueType> schema, List<String> names, long rlen, 
long clen)
+               throws IOException
        {
+               //create buffered reader
                BufferedReader br = new BufferedReader(new InputStreamReader( 
is ));    
                
                String value = null;
+               FastStringTokenizer st = new FastStringTokenizer(' ');
                int row = -1;
                int col = -1;
                
                try
                {                       
-                       FastStringTokenizer st = new FastStringTokenizer(' ');
-                       
-                       while( (value=br.readLine())!=null )
-                       {
+                       while( (value=br.readLine())!=null ) {
                                st.reset( value ); //reinit tokenizer
                                row = st.nextInt()-1;
                                col = st.nextInt()-1;   
@@ -245,18 +260,15 @@ public class FrameReaderTextCell extends FrameReader
                catch(Exception ex)
                {
                        //post-mortem error handling and bounds checking
-                       if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > 
clen ) 
-                       {
+                       if( row < 0 || row + 1 > rlen || col < 0 || col + 1 > 
clen ) {
                                throw new IOException("Frame cell 
["+(row+1)+","+(col+1)+"] " +
                                                                          "out 
of overall frame range [1:"+rlen+",1:"+clen+"].", ex);
                        }
-                       else
-                       {
+                       else {
                                throw new IOException( "Unable to read frame in 
raw text cell format.", ex );
                        }
                }
-               finally
-               {
+               finally {
                        IOUtilFunctions.closeSilently(br);
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
new file mode 100644
index 0000000..ce3993f
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCellParallel.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.parser.Expression.ValueType;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+
+/**
+ * Multi-threaded frame textcell reader.
+ * 
+ */
+public class FrameReaderTextCellParallel extends FrameReaderTextCell
+{      
+       /**
+        * 
+        * @param path
+        * @param job
+        * @param fs
+        * @param dest
+        * @param schema
+        * @param names
+        * @param rlen
+        * @param clen
+        * @throws IOException
+        */
+       @Override
+       protected void readTextCellFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, 
+                       List<ValueType> schema, List<String> names, long rlen, 
long clen)
+               throws IOException
+       {
+               int numThreads = 
OptimizerUtils.getParallelTextReadParallelism();
+               
+               FileInputFormat.addInputPath(job, path);
+               TextInputFormat informat = new TextInputFormat();
+               informat.configure(job);
+               
+               try 
+               {
+                       //create read tasks for all splits
+                       ExecutorService pool = 
Executors.newFixedThreadPool(numThreads);
+                       InputSplit[] splits = informat.getSplits(job, 
numThreads);
+                       ArrayList<ReadTask> tasks = new ArrayList<ReadTask>();
+                       for( InputSplit split : splits )
+                               tasks.add(new ReadTask(split, informat, job, 
dest));
+                       
+                       //wait until all tasks have been executed
+                       List<Future<Object>> rt = pool.invokeAll(tasks);        
+                       pool.shutdown();
+                               
+                       //check for exceptions
+                       for( Future<Object> task : rt )
+                               task.get();
+               } 
+               catch (Exception e) {
+                       throw new IOException("Failed parallel read of text 
cell input.", e);
+               }
+       }
+       
+       /**
+        * 
+        */
+       public class ReadTask implements Callable<Object> 
+       {
+               private InputSplit _split = null;
+               private TextInputFormat _informat = null;
+               private JobConf _job = null;
+               private FrameBlock _dest = null;
+               
+               public ReadTask( InputSplit split, TextInputFormat informat, 
JobConf job, FrameBlock dest ) {
+                       _split = split;
+                       _informat = informat;
+                       _job = job;
+                       _dest = dest;
+               }
+
+               @Override
+               public Object call() throws Exception {
+                       readTextCellFrameFromInputSplit(_split, _informat, 
_job, _dest);
+                       return null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 69efec0..93a8818 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -24,10 +24,15 @@ import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Comparator;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -156,4 +161,27 @@ public class IOUtilFunctions
                input.close();          
                return bos.toString("UTF-8");
        }
+
+       /**
+        * 
+        * @param splits
+        * @return
+        */
+       public static InputSplit[] sortInputSplits(InputSplit[] splits) {
+               if (splits[0] instanceof FileSplit) {
+                       // The splits do not always arrive in order by file 
name.
+                       // Sort the splits lexicographically by path so that 
the header will
+                       // be in the first split.
+                       // Note that we're assuming that the splits come in 
order by offset
+                       Arrays.sort(splits, new Comparator<InputSplit>() {
+                               @Override
+                               public int compare(InputSplit o1, InputSplit 
o2) {
+                                       Path p1 = ((FileSplit) o1).getPath();
+                                       Path p2 = ((FileSplit) o2).getPath();
+                                       return 
p1.toString().compareTo(p2.toString());
+                               }
+                       });
+               }               
+               return splits;
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 64c055c..8041ba7 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -21,8 +21,6 @@ package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -33,7 +31,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -84,21 +81,7 @@ public class ReaderTextCSVParallel extends MatrixReader
                informat.configure(job);
 
                InputSplit[] splits = informat.getSplits(job, _numThreads);
-
-               if (splits[0] instanceof FileSplit) {
-                       // The splits do not always arrive in order by file 
name.
-                       // Sort the splits lexicographically by path so that 
the header will
-                       // be in the first split.
-                       // Note that we're assuming that the splits come in 
order by offset
-                       Arrays.sort(splits, new Comparator<InputSplit>() {
-                               @Override
-                               public int compare(InputSplit o1, InputSplit 
o2) {
-                                       Path p1 = ((FileSplit) o1).getPath();
-                                       Path p2 = ((FileSplit) o2).getPath();
-                                       return 
p1.toString().compareTo(p2.toString());
-                               }
-                       });
-               }
+               splits = IOUtilFunctions.sortInputSplits(splits);
 
                // check existence and non-empty file
                checkValidInputFile(fs, path);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/dea42de1/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
index b07d09e..7d45ebc 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameReadWriteTest.java
@@ -62,35 +62,94 @@ public class FrameReadWriteTest extends AutomatedTestBase
        }
 
        @Test
-       public void testFrameStringsStrings()  {
-               runFrameReadWriteTest(schemaStrings, schemaStrings, false);
+       public void testFrameStringsStringsBinary()  {
+               runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, 
schemaStrings, schemaStrings, false);
        }
        
        @Test
-       public void testFrameStringsStringsParallel()  { 
-               runFrameReadWriteTest(schemaStrings, schemaStrings, true);
+       public void testFrameStringsStringsBinaryParallel()  { 
+               runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, 
schemaStrings, schemaStrings, true);
        }
        
        @Test
-       public void testFrameMixedStrings()  {
-               runFrameReadWriteTest(schemaMixed, schemaStrings, false);
+       public void testFrameMixedStringsBinary()  {
+               runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, 
schemaMixed, schemaStrings, false);
        }
        
        @Test
-       public void testFrameStringsMixedParallel()  {
-               runFrameReadWriteTest(schemaStrings, schemaMixed, true);
+       public void testFrameStringsMixedBinaryParallel()  {
+               runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, 
schemaStrings, schemaMixed, true);
        }
        
        @Test
-       public void testFrameMixedMixed()  {
-               runFrameReadWriteTest(schemaMixed, schemaMixed, false);
+       public void testFrameMixedMixedBinary()  {
+               runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, 
schemaMixed, schemaMixed, false);
        }
        
        @Test
-       public void testFrameMixedMixedParallel()  {
-               runFrameReadWriteTest(schemaMixed, schemaMixed, true);
+       public void testFrameMixedMixedBinaryParallel()  {
+               runFrameReadWriteTest(OutputInfo.BinaryBlockOutputInfo, 
schemaMixed, schemaMixed, true);
        }
 
+       @Test
+       public void testFrameStringsStringsTextCell()  {
+               runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
schemaStrings, schemaStrings, false);
+       }
+       
+       @Test
+       public void testFrameStringsStringsTextCellParallel()  { 
+               runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
schemaStrings, schemaStrings, true);
+       }
+       
+       @Test
+       public void testFrameMixedStringsTextCell()  {
+               runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
schemaMixed, schemaStrings, false);
+       }
+       
+       @Test
+       public void testFrameStringsMixedTextCellParallel()  {
+               runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
schemaStrings, schemaMixed, true);
+       }
+       
+       @Test
+       public void testFrameMixedMixedTextCell()  {
+               runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
schemaMixed, schemaMixed, false);
+       }
+       
+       @Test
+       public void testFrameMixedMixedTextCellParallel()  {
+               runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
schemaMixed, schemaMixed, true);
+       }
+
+       @Test
+       public void testFrameStringsStringsTextCSV()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaStrings, 
schemaStrings, false);
+       }
+       
+       @Test
+       public void testFrameStringsStringsTextCSVParallel()  { 
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaStrings, 
schemaStrings, true);
+       }
+       
+       @Test
+       public void testFrameMixedStringsTextCSV()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaMixed, 
schemaStrings, false);
+       }
+       
+       @Test
+       public void testFrameStringsMixedTextCSVParallel()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaStrings, 
schemaMixed, true);
+       }
+       
+       @Test
+       public void testFrameMixedMixedTextCSV()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaMixed, 
schemaMixed, false);
+       }
+       
+       @Test
+       public void testFrameMixedMixedTextCSVParallel()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, schemaMixed, 
schemaMixed, true);
+       }
        
        /**
         * 
@@ -98,7 +157,7 @@ public class FrameReadWriteTest extends AutomatedTestBase
         * @param sparseM2
         * @param instType
         */
-       private void runFrameReadWriteTest( ValueType[] schema1, ValueType[] 
schema2, boolean parallel)
+       private void runFrameReadWriteTest( OutputInfo oinfo, ValueType[] 
schema1, ValueType[] schema2, boolean parallel)
        {
                boolean oldParText = CompilerConfig.FLAG_PARREADWRITE_TEXT;
                boolean oldParBin = CompilerConfig.FLAG_PARREADWRITE_BINARY;
@@ -129,9 +188,7 @@ public class FrameReadWriteTest extends AutomatedTestBase
                        fprop.setDelim(DELIMITER);
                        fprop.setHeader(HEADER);
                        
-                       writeAndVerifyData(OutputInfo.TextCellOutputInfo, 
frame1, frame2, fprop);
-                       writeAndVerifyData(OutputInfo.CSVOutputInfo, frame1, 
frame2, fprop);
-                       writeAndVerifyData(OutputInfo.BinaryBlockOutputInfo, 
frame1, frame2, fprop);
+                       writeAndVerifyData(oinfo, frame1, frame2, fprop);
                }
                catch(Exception ex) {
                        ex.printStackTrace();


Reply via email to