Repository: incubator-systemml
Updated Branches:
  refs/heads/master 1d1a9fa40 -> 23709ec60


[SYSTEMML-1464] Fix missing matrix/frame csv read from input streams

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/23709ec6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/23709ec6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/23709ec6

Branch: refs/heads/master
Commit: 23709ec6088af53163147eab72b2b9c06a3a637c
Parents: 1d1a9fa
Author: Matthias Boehm <[email protected]>
Authored: Wed Apr 5 00:14:10 2017 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Wed Apr 5 00:14:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/api/jmlc/Connection.java   | 106 +++++++----
 .../apache/sysml/runtime/io/FrameReader.java    |  17 +-
 .../runtime/io/FrameReaderBinaryBlock.java      |   9 +-
 .../sysml/runtime/io/FrameReaderTextCSV.java    |  23 ++-
 .../sysml/runtime/io/FrameReaderTextCell.java   |   7 +-
 .../apache/sysml/runtime/io/MatrixReader.java   |   4 +
 .../sysml/runtime/io/ReaderBinaryBlock.java     |   8 +
 .../sysml/runtime/io/ReaderBinaryCell.java      |   9 +-
 .../apache/sysml/runtime/io/ReaderTextCSV.java  | 189 +++++++++++--------
 .../sysml/runtime/io/ReaderTextCSVParallel.java |   8 +
 .../apache/sysml/runtime/io/ReaderTextCell.java |   1 +
 .../runtime/io/ReaderTextCellParallel.java      |   8 +
 .../runtime/util/InputStreamInputFormat.java    |  96 ++++++++++
 .../functions/jmlc/JMLCInputStreamReadTest.java | 183 ++++++++++++++++++
 .../functions/jmlc/ZPackageSuite.java           |   1 +
 15 files changed, 540 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/api/jmlc/Connection.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java 
b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
index a3d7ae7..5240dc4 100644
--- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java
+++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java
@@ -50,11 +50,9 @@ import org.apache.sysml.runtime.controlprogram.Program;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.io.FrameReader;
 import org.apache.sysml.runtime.io.FrameReaderFactory;
-import org.apache.sysml.runtime.io.FrameReaderTextCell;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.io.MatrixReader;
 import org.apache.sysml.runtime.io.MatrixReaderFactory;
-import org.apache.sysml.runtime.io.ReaderTextCell;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -324,11 +322,11 @@ public class Connection implements Closeable
        }
        
        /**
-        * Converts an input string representation of a matrix in textcell 
format
+        * Converts an input string representation of a matrix in csv or 
textcell format
         * into a dense double array. The meta data string is the SystemML 
generated
         * .mtd file including the number of rows and columns.
         * 
-        * @param input string matrix in textcell format
+        * @param input string matrix in csv or textcell format
         * @param meta string representing SystemML matrix metadata in JSON 
format
         * @return matrix as a two-dimensional double array
         * @throws IOException if IOException occurs
@@ -342,15 +340,10 @@ public class Connection implements Closeable
                        int rows = jmtd.getInt(DataExpression.READROWPARAM);
                        int cols = jmtd.getInt(DataExpression.READCOLPARAM);
                        String format = 
jmtd.getString(DataExpression.FORMAT_TYPE);
-       
-                       //sanity check input format
-                       
if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
-                               
||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) {
-                               throw new IOException("Invalid input format 
(expected: text or mm): "+format);
-                       }
                        
                        //parse the input matrix
-                       return convertToDoubleMatrix(input, rows, cols);
+                       InputStream is = IOUtilFunctions.toInputStream(input);
+                       return convertToDoubleMatrix(is, rows, cols, format);
                }
                catch(Exception ex) {
                        throw new IOException(ex);
@@ -359,9 +352,7 @@ public class Connection implements Closeable
        
        /**
         * Converts an input string representation of a matrix in textcell 
format
-        * into a dense double array. The number of rows and columns need to be 
-        * specified because textcell only represents non-zero values and hence
-        * does not define the dimensions in the general case.
+        * into a dense double array. 
         * 
         * @param input string matrix in textcell format
         * @param rows number of rows in the matrix
@@ -378,9 +369,7 @@ public class Connection implements Closeable
        
        /**
         * Converts an input stream of a string matrix in textcell format
-        * into a dense double array. The number of rows and columns need to be 
-        * specified because textcell only represents non-zero values and hence
-        * does not define the dimensions in the general case.
+        * into a dense double array. 
         * 
         * @param input InputStream to a string matrix in textcell format
         * @param rows number of rows in the matrix
@@ -388,15 +377,40 @@ public class Connection implements Closeable
         * @return matrix as a two-dimensional double array
         * @throws IOException if IOException occurs
         */
-       public double[][] convertToDoubleMatrix(InputStream input, int rows, 
int cols) 
+       public double[][] convertToDoubleMatrix(InputStream input, int rows, 
int cols) throws IOException {
+               return convertToDoubleMatrix(input, rows, cols, 
DataExpression.FORMAT_TYPE_VALUE_TEXT);
+       }
+       
+       /**
+        * Converts an input stream of a string matrix in csv or textcell format
+        * into a dense double array. 
+        * 
+        * @param input InputStream to a string matrix in csv or textcell format
+        * @param rows number of rows in the matrix
+        * @param cols number of columns in the matrix
+        * @param format input format of the given stream
+        * @return matrix as a two-dimensional double array
+        * @throws IOException if IOException occurs
+        */
+       public double[][] convertToDoubleMatrix(InputStream input, int rows, 
int cols, String format) 
                throws IOException
        {
                double[][] ret = null;
+
+               //sanity check input format
+               if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
+                       
||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format)
+                       ||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format)) 
) {
+                       throw new IOException("Invalid input format (expected: 
csv, text or mm): "+format);
+               }
                
                try {
                        //read input matrix
-                       ReaderTextCell reader = 
(ReaderTextCell)MatrixReaderFactory.createMatrixReader(InputInfo.TextCellInputInfo);
-                       MatrixBlock mb = 
reader.readMatrixFromInputStream(input, rows, cols, 
ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), 
(long)rows*cols);
+                       InputInfo iinfo = 
DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
+                                       InputInfo.CSVInputInfo : 
InputInfo.TextCellInputInfo;
+                       MatrixReader reader = 
MatrixReaderFactory.createMatrixReader(iinfo);
+                       MatrixBlock mb = 
reader.readMatrixFromInputStream(input, rows, cols, 
+                                       ConfigurationManager.getBlocksize(), 
ConfigurationManager.getBlocksize(), (long)rows*cols);
                
                        //convert to double array
                        ret = DataConverter.convertToDoubleMatrix( mb );
@@ -467,11 +481,11 @@ public class Connection implements Closeable
        }
        
        /**
-        * Converts an input string representation of a frame in textcell format
+        * Converts an input string representation of a frame in csv or 
textcell format
         * into a dense string array. The meta data string is the SystemML 
generated
         * .mtd file including the number of rows and columns.
         * 
-        * @param input string frame in textcell format
+        * @param input string frame in csv or textcell format
         * @param meta string representing SystemML frame metadata in JSON 
format
         * @return frame as a two-dimensional string array
         * @throws IOException if IOException occurs
@@ -485,15 +499,10 @@ public class Connection implements Closeable
                        int rows = jmtd.getInt(DataExpression.READROWPARAM);
                        int cols = jmtd.getInt(DataExpression.READCOLPARAM);
                        String format = 
jmtd.getString(DataExpression.FORMAT_TYPE);
-       
-                       //sanity check input format
-                       
if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
-                               
||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) {
-                               throw new IOException("Invalid input format 
(expected: text or mm): "+format);
-                       }
                        
                        //parse the input frame
-                       return convertToStringFrame(input, rows, cols);
+                       InputStream is = IOUtilFunctions.toInputStream(input);
+                       return convertToStringFrame(is, rows, cols, format);
                }
                catch(Exception ex) {
                        throw new IOException(ex);
@@ -502,9 +511,7 @@ public class Connection implements Closeable
        
        /**
         * Converts an input string representation of a frame in textcell format
-        * into a dense string array. The number of rows and columns need to be 
-        * specified because textcell only represents non-zero values and hence
-        * does not define the dimensions in the general case.
+        * into a dense string array. 
         * 
         * @param input string frame in textcell format
         * @param rows number of rows in the frame
@@ -521,9 +528,7 @@ public class Connection implements Closeable
        
        /**
         * Converts an input stream of a string frame in textcell format
-        * into a dense string array. The number of rows and columns need to be 
-        * specified because textcell only represents non-zero values and hence
-        * does not define the dimensions in the general case.
+        * into a dense string array. 
         * 
         * @param input InputStream to a string frame in textcell format
         * @param rows number of rows in the frame
@@ -531,14 +536,38 @@ public class Connection implements Closeable
         * @return frame as a two-dimensional string array
         * @throws IOException if IOException occurs
         */
-       public String[][] convertToStringFrame(InputStream input, int rows, int 
cols) 
+       public String[][] convertToStringFrame(InputStream input, int rows, int 
cols) throws IOException {
+               return convertToStringFrame(input, rows, cols, 
DataExpression.FORMAT_TYPE_VALUE_TEXT);
+       }
+       
+       /**
+        * Converts an input stream of a string frame in csv or textcell format
+        * into a dense string array. 
+        * 
+        * @param input InputStream to a string frame in csv or textcell format
+        * @param rows number of rows in the frame
+        * @param cols number of columns in the frame
+        * @param format input format of the given stream
+        * @return frame as a two-dimensional string array
+        * @throws IOException if IOException occurs
+        */
+       public String[][] convertToStringFrame(InputStream input, int rows, int 
cols, String format) 
                throws IOException
        {
                String[][] ret = null;
+       
+               //sanity check input format
+               if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format)
+                       
||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format)
+                       ||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format))) 
{
+                       throw new IOException("Invalid input format (expected: 
csv, text or mm): "+format);
+               }
                
                try {
-                       //read input matrix
-                       FrameReaderTextCell reader = 
(FrameReaderTextCell)FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo);
+                       //read input frame
+                       InputInfo iinfo = 
DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? 
+                                       InputInfo.CSVInputInfo : 
InputInfo.TextCellInputInfo;
+                       FrameReader reader = 
FrameReaderFactory.createFrameReader(iinfo);
                        FrameBlock mb = reader.readFrameFromInputStream(input, 
rows, cols);
                
                        //convert to double array
@@ -551,7 +580,6 @@ public class Connection implements Closeable
                return ret;
        }
        
-       
        ////////////////////////////////////////////
        // Read transform meta data
        ////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
index 3aac76b..321735d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.LinkedList;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -42,7 +43,6 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public abstract class FrameReader 
 {
-
        public abstract FrameBlock readFrameFromHDFS( String fname, ValueType[] 
schema, String[] names, long rlen, long clen)
                throws IOException, DMLRuntimeException;
 
@@ -58,6 +58,21 @@ public abstract class FrameReader
                return readFrameFromHDFS(fname, getDefSchema(clen), 
getDefColNames(clen), rlen, clen);
        }
 
+       public abstract FrameBlock readFrameFromInputStream( InputStream is, 
ValueType[] schema, String[] names, long rlen, long clen)
+               throws IOException, DMLRuntimeException;
+
+       public FrameBlock readFrameFromInputStream( InputStream is, ValueType[] 
schema, long rlen, long clen )
+               throws IOException, DMLRuntimeException
+       {
+               return readFrameFromInputStream(is, schema, 
getDefColNames(schema.length), rlen, clen);
+       }
+
+       public FrameBlock readFrameFromInputStream( InputStream is, long rlen, 
long clen )
+               throws IOException, DMLRuntimeException
+       {
+               return readFrameFromInputStream(is, getDefSchema(clen), 
getDefColNames(clen), rlen, clen);
+       }
+
        public ValueType[] getDefSchema( long clen )
                throws IOException, DMLRuntimeException
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
index a9df026..32feea3 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -38,7 +39,6 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock;
  */
 public class FrameReaderBinaryBlock extends FrameReader
 {
-
        @Override
        public final FrameBlock readFrameFromHDFS(String fname, ValueType[] 
schema, String[] names, long rlen, long clen) 
                throws IOException, DMLRuntimeException 
@@ -61,6 +61,13 @@ public class FrameReaderBinaryBlock extends FrameReader
                
                return ret;
        }
+       
+       @Override
+       public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] 
schema, String[] names, long rlen, long clen)
+               throws IOException, DMLRuntimeException 
+       {
+               throw new DMLRuntimeException("Not implemented yet.");
+       }
 
        protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, 
FileSystem fs, FrameBlock dest, long rlen, long clen )
                throws IOException, DMLRuntimeException

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index 707071f..e86cbe2 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -38,6 +40,7 @@ import 
org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.transform.TfUtils;
+import org.apache.sysml.runtime.util.InputStreamInputFormat;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -83,6 +86,24 @@ public class FrameReaderTextCSV extends FrameReader
                
                return ret;
        }
+       
+       @Override
+       public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] 
schema, String[] names, 
+                       long rlen, long clen)
+               throws IOException, DMLRuntimeException 
+       {
+               //allocate output frame block
+               ValueType[] lschema = createOutputSchema(schema, clen);
+               String[] lnames = createOutputNames(names, clen);
+               FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen);
+       
+               //core read (sequential/parallel) 
+               InputStreamInputFormat informat = new 
InputStreamInputFormat(is);
+               InputSplit split = informat.getSplits(null, 1)[0];
+               readCSVFrameFromInputSplit(split, informat, null, ret, schema, 
names, rlen, clen, 0, true);
+               
+               return ret;
+       }
 
        protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem 
fs, 
                        FrameBlock dest, ValueType[] schema, String[] names, 
long rlen, long clen) 
@@ -96,7 +117,7 @@ public class FrameReaderTextCSV extends FrameReader
                        readCSVFrameFromInputSplit(splits[i], informat, job, 
dest, schema, names, rlen, clen, 0, i==0);
        }
 
-       protected final void readCSVFrameFromInputSplit( InputSplit split, 
TextInputFormat informat, JobConf job, 
+       protected final void readCSVFrameFromInputSplit( InputSplit split, 
InputFormat<LongWritable,Text> informat, JobConf job, 
                        FrameBlock dest, ValueType[] schema, String[] names, 
long rlen, long clen, int rl, boolean first)
                throws IOException
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
index e8be829..548452f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java
@@ -47,7 +47,6 @@ import org.apache.sysml.runtime.util.UtilFunctions;
  */
 public class FrameReaderTextCell extends FrameReader
 {
-
        @Override
        public final FrameBlock readFrameFromHDFS(String fname, ValueType[] 
schema, String[] names, long rlen, long clen)
                throws IOException, DMLRuntimeException
@@ -71,11 +70,7 @@ public class FrameReaderTextCell extends FrameReader
                return ret;
        }
 
-       public final FrameBlock readFrameFromInputStream(InputStream is, long 
rlen, long clen) 
-               throws IOException, DMLRuntimeException {
-               return readFrameFromInputStream(is, getDefSchema(clen), 
getDefColNames(clen), rlen, clen);
-       }
-
+       @Override
        public final FrameBlock readFrameFromInputStream(InputStream is, 
ValueType[] schema, String[] names, long rlen, long clen) 
                throws IOException, DMLRuntimeException 
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java 
b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index 75de6a8..ffe290e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -54,6 +55,9 @@ public abstract class MatrixReader
        public abstract MatrixBlock readMatrixFromHDFS( String fname, long 
rlen, long clen, int brlen, int bclen, long estnnz )
                throws IOException, DMLRuntimeException;
 
+       public abstract MatrixBlock readMatrixFromInputStream( InputStream is, 
long rlen, long clen, int brlen, int bclen, long estnnz )
+                       throws IOException, DMLRuntimeException;
+       
        public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) 
                throws IOException
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 015ae25..4c8549e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 
@@ -73,6 +74,13 @@ public class ReaderBinaryBlock extends MatrixReader
                
                return ret;
        }
+       
+       @Override
+       public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
+               throws IOException, DMLRuntimeException 
+       {
+               throw new DMLRuntimeException("Not implemented yet.");
+       }
 
        public ArrayList<IndexedMatrixValue> 
readIndexedMatrixBlocksFromHDFS(String fname, long rlen, long clen, int brlen, 
int bclen) 
                throws IOException, DMLRuntimeException 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index f148ceb..dcf9e7b 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,7 +35,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 
 public class ReaderBinaryCell extends MatrixReader
 {
-
        @Override
        public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long 
clen, int brlen, int bclen, long estnnz) 
                throws IOException, DMLRuntimeException 
@@ -60,6 +60,13 @@ public class ReaderBinaryCell extends MatrixReader
                return ret;
        }
 
+       @Override
+       public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
+               throws IOException, DMLRuntimeException 
+       {
+               throw new DMLRuntimeException("Not implemented yet.");
+       }
+       
        @SuppressWarnings("deprecation")
        private void readBinaryCellMatrixFromHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen )
                throws IOException

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index 9d8f368..6256955 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -21,12 +21,14 @@ package org.apache.sysml.runtime.io;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,15 +43,12 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class ReaderTextCSV extends MatrixReader
 {
-
        private CSVFileFormatProperties _props = null;
        
-       public ReaderTextCSV(CSVFileFormatProperties props)
-       {
+       public ReaderTextCSV(CSVFileFormatProperties props) {
                _props = props;
        }
        
-
        @Override
        public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long 
clen, int brlen, int bclen, long estnnz) 
                throws IOException, DMLRuntimeException 
@@ -77,12 +76,31 @@ public class ReaderTextCSV extends MatrixReader
                
                return ret;
        }
-
+       
+       @Override
+       public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
+               throws IOException, DMLRuntimeException 
+       {
+               //allocate output matrix block
+               MatrixBlock ret = createOutputMatrixBlock(rlen, clen, 
(int)rlen, (int)clen, estnnz, true, false);
+               
+               //core read 
+               long lnnz = readCSVMatrixFromInputStream(is, "external 
inputstream", ret, new MutableInt(0), rlen, clen, 
+                       brlen, bclen, _props.hasHeader(), _props.getDelim(), 
_props.isFill(), _props.getFillValue(), true);
+                               
+               //finally check if change of sparse/dense block representation 
required
+               ret.setNonZeros( lnnz );
+               ret.examSparsity();
+               
+               return ret;
+       }
+       
        @SuppressWarnings("unchecked")
        private MatrixBlock readCSVMatrixFromHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock dest, 
                        long rlen, long clen, int brlen, int bclen, boolean 
hasHeader, String delim, boolean fill, double fillValue )
                throws IOException
        {
+               //prepare file paths in alphanumeric order
                ArrayList<Path> files=new ArrayList<Path>();
                if(fs.isDirectory(path)) {
                        for(FileStatus stat: fs.listStatus(path, 
CSVReblockMR.hiddenFileFilter))
@@ -92,107 +110,118 @@ public class ReaderTextCSV extends MatrixReader
                else
                        files.add(path);
                
+               //determine matrix size via additional pass if required
                if ( dest == null ) {
                        dest = computeCSVSize(files, job, fs, hasHeader, delim, 
fill, fillValue);
                        clen = dest.getNumColumns();
                }
                
-               boolean sparse = dest.isInSparseFormat();
+               //actual read of individual files
+               long lnnz = 0;
+               MutableInt row = new MutableInt(0);
+               for(int fileNo=0; fileNo<files.size(); fileNo++) {
+                       lnnz += 
readCSVMatrixFromInputStream(fs.open(files.get(fileNo)), path.toString(), dest, 
+                               row, rlen, clen, brlen, bclen, hasHeader, 
delim, fill, fillValue, fileNo==0);
+               }
+               
+               //post processing
+               dest.setNonZeros( lnnz );
                
-               /////////////////////////////////////////
+               return dest;
+       }
+       
+       private long readCSVMatrixFromInputStream( InputStream is, String 
srcInfo, MatrixBlock dest, MutableInt rowPos, 
+                       long rlen, long clen, int brlen, int bclen, boolean 
hasHeader, String delim, boolean fill, double fillValue, boolean first )
+               throws IOException
+       {
+               boolean sparse = dest.isInSparseFormat();
                String value = null;
-               int row = 0;
-               int col = -1;
+               int row = rowPos.intValue();
                double cellValue = 0;
                long lnnz = 0;
                
-               for(int fileNo=0; fileNo<files.size(); fileNo++)
+               BufferedReader br = new BufferedReader(new 
InputStreamReader(is));
+               if(first && hasHeader ) 
+                       br.readLine(); //ignore header
+               
+               // Read the data
+               boolean emptyValuesFound = false;
+               try
                {
-                       BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(files.get(fileNo))));
-                       if(fileNo==0 && hasHeader ) 
-                               br.readLine(); //ignore header
-                       
-                       // Read the data
-                       boolean emptyValuesFound = false;
-                       try
+                       if( sparse ) //SPARSE<-value
                        {
-                               if( sparse ) //SPARSE<-value
+                               while( (value=br.readLine())!=null ) //foreach 
line
                                {
-                                       while( (value=br.readLine())!=null ) 
//foreach line
+                                       String cellStr = 
value.toString().trim();
+                                       emptyValuesFound = false;
+                                       String[] parts = 
IOUtilFunctions.split(cellStr, delim);
+                                       int col = 0;
+                                       
+                                       for(String part : parts) //foreach cell
                                        {
-                                               String cellStr = 
value.toString().trim();
-                                               emptyValuesFound = false;
-                                               String[] parts = 
IOUtilFunctions.split(cellStr, delim);
-                                               col = 0;
-                                               
-                                               for(String part : parts) 
//foreach cell
-                                               {
-                                                       part = part.trim();
-                                                       if ( part.isEmpty() ) {
-                                                               
emptyValuesFound = true;
-                                                               cellValue = 
fillValue;
-                                                       }
-                                                       else {
-                                                               cellValue = 
UtilFunctions.parseToDouble(part);
-                                                       }
-                                                       if ( cellValue != 0 ) {
-                                                               
dest.appendValue(row, col, cellValue);
-                                                               lnnz++;
-                                                       }
-                                                       col++;
+                                               part = part.trim();
+                                               if ( part.isEmpty() ) {
+                                                       emptyValuesFound = true;
+                                                       cellValue = fillValue;
                                                }
-                                               
-                                               //sanity checks for empty 
values and number of columns
-                                               
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, 
emptyValuesFound);
-                                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, 
parts, clen);
-                                               row++;
+                                               else {
+                                                       cellValue = 
UtilFunctions.parseToDouble(part);
+                                               }
+                                               if ( cellValue != 0 ) {
+                                                       dest.appendValue(row, 
col, cellValue);
+                                                       lnnz++;
+                                               }
+                                               col++;
                                        }
-                               } 
-                               else //DENSE<-value
+                                       
+                                       //sanity checks for empty values and 
number of columns
+                                       
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, 
emptyValuesFound);
+                                       
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen);
+                                       row++;
+                               }
+                       } 
+                       else //DENSE<-value
+                       {
+                               while( (value=br.readLine())!=null ) //foreach 
line
                                {
-                                       while( (value=br.readLine())!=null ) 
//foreach line
+                                       String cellStr = 
value.toString().trim();
+                                       emptyValuesFound = false;
+                                       String[] parts = 
IOUtilFunctions.split(cellStr, delim);
+                                       int col = 0;
+                                       
+                                       for( String part : parts ) //foreach 
cell
                                        {
-                                               String cellStr = 
value.toString().trim();
-                                               emptyValuesFound = false;
-                                               String[] parts = 
IOUtilFunctions.split(cellStr, delim);
-                                               col = 0;
-                                               
-                                               for( String part : parts ) 
//foreach cell
-                                               {
-                                                       part = part.trim();
-                                                       if ( part.isEmpty() ) {
-                                                               
emptyValuesFound = true;
-                                                               cellValue = 
fillValue;
-                                                       }
-                                                       else {
-                                                               cellValue = 
UtilFunctions.parseToDouble(part);
-                                                       }
-                                                       if ( cellValue != 0 ) {
-                                                               
dest.setValueDenseUnsafe(row, col, cellValue);
-                                                               lnnz++;
-                                                       }
-                                                       col++;
+                                               part = part.trim();
+                                               if ( part.isEmpty() ) {
+                                                       emptyValuesFound = true;
+                                                       cellValue = fillValue;
+                                               }
+                                               else {
+                                                       cellValue = 
UtilFunctions.parseToDouble(part);
+                                               }
+                                               if ( cellValue != 0 ) {
+                                                       
dest.setValueDenseUnsafe(row, col, cellValue);
+                                                       lnnz++;
                                                }
-                                               
-                                               //sanity checks for empty 
values and number of columns
-                                               
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, 
emptyValuesFound);
-                                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, 
parts, clen);
-                                               row++;
+                                               col++;
                                        }
+                                       
+                                       //sanity checks for empty values and 
number of columns
+                                       
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, 
emptyValuesFound);
+                                       
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen);
+                                       row++;
                                }
                        }
-                       finally {
-                               IOUtilFunctions.closeSilently(br);
-                       }
+               }
+               finally {
+                       IOUtilFunctions.closeSilently(br);
                }
                
-               //post processing
-               dest.setNonZeros( lnnz );
-               
-               return dest;
+               rowPos.setValue(row);
+               return lnnz;
        }
 
-       private MatrixBlock computeCSVSize ( List<Path> files, JobConf job, 
FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) 
+       private MatrixBlock computeCSVSize( List<Path> files, JobConf job, 
FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) 
                throws IOException 
        {               
                int nrow = -1;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 15d4858..75b3bd9 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -111,6 +112,13 @@ public class ReaderTextCSVParallel extends MatrixReader
                return ret;
        }
 
+       @Override
+       public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
+               throws IOException, DMLRuntimeException 
+       {
+               throw new DMLRuntimeException("Not implemented yet.");
+       }
+       
        private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, 
JobConf job, 
                        MatrixBlock dest, long rlen, long clen, int brlen, int 
bclen, 
                        boolean hasHeader, String delim, boolean fill, double 
fillValue) 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 1c9cba5..3b93c33 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -79,6 +79,7 @@ public class ReaderTextCell extends MatrixReader
                return ret;
        }
 
+       @Override
        public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
                throws IOException, DMLRuntimeException 
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index f693455..9501b6d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.io;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -102,6 +103,13 @@ public class ReaderTextCellParallel extends MatrixReader
                return ret;
        }
 
+       @Override
+       public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int brlen, int bclen, long estnnz) 
+               throws IOException, DMLRuntimeException 
+       {
+               throw new DMLRuntimeException("Not implemented yet.");
+       }
+       
        private void readTextCellMatrixFromHDFS( Path path, JobConf job, 
MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean 
matrixMarket )
                throws IOException
        {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java 
b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
new file mode 100644
index 0000000..83641f6
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * Custom input format and record reader to redirect common implementation of 
csv read 
+ * over record readers (which are required for the parallel readers) to an 
input stream.
+ * 
+ */
+public class InputStreamInputFormat implements InputFormat<LongWritable, Text>
+{
+       private final InputStream _input;
+               
+       public InputStreamInputFormat(InputStream is) {
+               _input = is;
+       }
+
+       @Override
+       public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException {
+               //return dummy handle - stream accessed purely over record 
reader
+               return new InputSplit[]{new FileSplit(null)};
+       }
+
+       @Override
+       public RecordReader<LongWritable,Text> getRecordReader(InputSplit 
split, JobConf job, Reporter reporter) throws IOException {
+               return new InputStreamRecordReader(_input);
+       }
+               
+       private static class InputStreamRecordReader implements 
RecordReader<LongWritable, Text>
+       {
+               private final BufferedReader _reader;
+               
+               public InputStreamRecordReader(InputStream is) {
+                       _reader = new BufferedReader(new InputStreamReader( is 
));
+               }
+               
+               @Override
+               public LongWritable createKey() {
+                       return new LongWritable();
+               }
+               @Override
+               public Text createValue() {
+                       return new Text();
+               }                       
+               @Override
+               public float getProgress() throws IOException {
+                       return 0;
+               }
+               @Override
+               public long getPos() throws IOException {
+                       return 0;
+               }
+               @Override
+               public boolean next(LongWritable key, Text value) throws 
IOException {
+                       String line = _reader.readLine();
+                       if( line != null )
+                               value.set(line);
+                       return (line != null);
+               }
+               @Override
+               public void close() throws IOException {
+                       _reader.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
new file mode 100644
index 0000000..05ee456
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.jmlc;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+import org.apache.sysml.api.jmlc.Connection;
+import org.apache.sysml.parser.Expression.DataType;
+import org.apache.sysml.runtime.io.FrameWriter;
+import org.apache.sysml.runtime.io.FrameWriterFactory;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
+import org.apache.sysml.runtime.io.MatrixWriter;
+import org.apache.sysml.runtime.io.MatrixWriterFactory;
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.OutputInfo;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.runtime.util.MapReduceTool;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class JMLCInputStreamReadTest extends AutomatedTestBase 
+{
+       private final static String TEST_NAME = "jmlc";
+       private final static String TEST_DIR = "functions/jmlc/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
JMLCInputStreamReadTest.class.getSimpleName() + "/";
+       
+       private final static int rows = 700;
+       private final static int cols = 3;
+       
+       private final static double sparsity1 = 0.7;
+       private final static double sparsity2 = 0.1;
+       
+       @Override
+       public void setUp() {
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { "R" }) ); 
+       }
+       
+       @Test
+       public void testInputStreamReadMatrixDenseCSV() throws IOException {
+               runJMLCInputStreamReadTest(DataType.MATRIX, false, "csv", 
false);
+       }
+       
+       @Test
+       public void testInputStreamReadMatrixDenseText() throws IOException {
+               runJMLCInputStreamReadTest(DataType.MATRIX, false, "text", 
false);
+       }
+       
+       @Test
+       public void testInputStreamReadMatrixSparseCSV() throws IOException {
+               runJMLCInputStreamReadTest(DataType.MATRIX, true, "csv", false);
+       }
+       
+       @Test
+       public void testInputStreamReadMatrixSparseText() throws IOException {
+               runJMLCInputStreamReadTest(DataType.MATRIX, true, "text", 
false);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameDenseCSV() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", false);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameDenseText() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, false, "text", 
false);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameSparseCSV() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", false);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameSparseText() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, true, "text", false);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameDenseCSVMeta() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", true);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameDenseTextMeta() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, false, "text", true);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameSparseCSVMeta() throws IOException {
+               runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", true);
+       }
+       
+       @Test
+       public void testInputStreamReadFrameSparseTextMeta() throws IOException 
{
+               runJMLCInputStreamReadTest(DataType.FRAME, true, "text", true);
+       }
+       
+       private void runJMLCInputStreamReadTest(DataType dt, boolean sparse, 
String format, boolean metaData ) 
+               throws IOException
+       {       
+               TestConfiguration config = getTestConfiguration(TEST_NAME);
+               loadTestConfiguration(config);
+       
+               //generate inputs
+               OutputInfo oinfo = format.equals("csv") ? 
OutputInfo.CSVOutputInfo : OutputInfo.TextCellOutputInfo;
+               double[][] data = TestUtils.round(getRandomMatrix(rows, cols, 
0.51, 7.49, sparse?sparsity2:sparsity1, 7));
+       
+               Connection conn = new Connection();
+               
+               try
+               {
+                       if( dt == DataType.MATRIX ) 
+                       {
+                               //write input matrix
+                               MatrixBlock mb = 
DataConverter.convertToMatrixBlock(data);
+                               MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(oinfo);
+                               writer.writeMatrixToHDFS(mb, output("X"), rows, 
cols, -1, -1, -1);
+                               
+                               //read matrix from input stream 
+                               FileInputStream fis = new 
FileInputStream(output("X"));
+                               double[][] data2 = 
conn.convertToDoubleMatrix(fis, rows, cols, format);
+                               
+                               //compare matrix result
+                               TestUtils.compareMatrices(data, data2, rows, 
cols, 0);
+                       }
+                       else if( dt == DataType.FRAME )
+                       {
+                               //write input frame
+                               String[][] fdata = 
FrameTransformTest.createFrameData(data, "V");
+                               fdata[3][1] = "\"ab\"\"cdef\""; //test quoted 
tokens w/ inner quotes
+                               if( format.equals("csv") )
+                                       fdata[7][2] = "\"a,bc def\""; //test 
delimiter and space tokens
+                               FrameBlock fb = 
DataConverter.convertToFrameBlock(fdata);
+                               if( metaData ) {
+                                       
fb.setColumnNames(IntStream.range(0,cols).mapToObj(i -> "CC"+i)
+                                               
.collect(Collectors.toList()).toArray(new String[0]));
+                               }
+                               FrameWriter writer = 
FrameWriterFactory.createFrameWriter(oinfo);
+                               writer.writeFrameToHDFS(fb, output("X"), rows, 
cols);
+                               
+                               //read frame from input stream 
+                               FileInputStream fis = new 
FileInputStream(output("X"));
+                               String[][] fdata2 = 
conn.convertToStringFrame(fis, rows, cols, format);
+                               
+                               //compare frame result
+                               TestUtils.compareFrames(fdata, fdata2, rows, 
cols);
+                       }
+                       else {
+                               throw new IOException("Unsupported data type: 
"+dt.name());
+                       }
+               }
+               catch(Exception ex) {
+                       throw new RuntimeException(ex);
+               }
+               finally {
+                       MapReduceTool.deleteFileIfExistOnHDFS(output("X"));
+                       IOUtilFunctions.closeSilently(conn);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
----------------------------------------------------------------------
diff --git 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
index 9bf6a1c..9eb6af1 100644
--- 
a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
+++ 
b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java
@@ -34,6 +34,7 @@ import org.junit.runners.Suite;
        FrameReadMetaTest.class,
        FrameTransformTest.class,
        JMLCInputOutputTest.class,
+       JMLCInputStreamReadTest.class,
        ReuseModelVariablesTest.class,
        SystemTMulticlassSVMScoreTest.class
 })


Reply via email to