Repository: incubator-systemml Updated Branches: refs/heads/master 1d1a9fa40 -> 23709ec60
[SYSTEMML-1464] Fix missing matrix/frame csv read from input streams Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/23709ec6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/23709ec6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/23709ec6 Branch: refs/heads/master Commit: 23709ec6088af53163147eab72b2b9c06a3a637c Parents: 1d1a9fa Author: Matthias Boehm <[email protected]> Authored: Wed Apr 5 00:14:10 2017 -0700 Committer: Matthias Boehm <[email protected]> Committed: Wed Apr 5 00:14:10 2017 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/api/jmlc/Connection.java | 106 +++++++---- .../apache/sysml/runtime/io/FrameReader.java | 17 +- .../runtime/io/FrameReaderBinaryBlock.java | 9 +- .../sysml/runtime/io/FrameReaderTextCSV.java | 23 ++- .../sysml/runtime/io/FrameReaderTextCell.java | 7 +- .../apache/sysml/runtime/io/MatrixReader.java | 4 + .../sysml/runtime/io/ReaderBinaryBlock.java | 8 + .../sysml/runtime/io/ReaderBinaryCell.java | 9 +- .../apache/sysml/runtime/io/ReaderTextCSV.java | 189 +++++++++++-------- .../sysml/runtime/io/ReaderTextCSVParallel.java | 8 + .../apache/sysml/runtime/io/ReaderTextCell.java | 1 + .../runtime/io/ReaderTextCellParallel.java | 8 + .../runtime/util/InputStreamInputFormat.java | 96 ++++++++++ .../functions/jmlc/JMLCInputStreamReadTest.java | 183 ++++++++++++++++++ .../functions/jmlc/ZPackageSuite.java | 1 + 15 files changed, 540 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/api/jmlc/Connection.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/jmlc/Connection.java b/src/main/java/org/apache/sysml/api/jmlc/Connection.java index a3d7ae7..5240dc4 100644 --- a/src/main/java/org/apache/sysml/api/jmlc/Connection.java +++ b/src/main/java/org/apache/sysml/api/jmlc/Connection.java @@ -50,11 +50,9 @@ import org.apache.sysml.runtime.controlprogram.Program; import org.apache.sysml.runtime.controlprogram.caching.CacheableData; import org.apache.sysml.runtime.io.FrameReader; import org.apache.sysml.runtime.io.FrameReaderFactory; -import org.apache.sysml.runtime.io.FrameReaderTextCell; import org.apache.sysml.runtime.io.IOUtilFunctions; import org.apache.sysml.runtime.io.MatrixReader; import org.apache.sysml.runtime.io.MatrixReaderFactory; -import org.apache.sysml.runtime.io.ReaderTextCell; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.InputInfo; import org.apache.sysml.runtime.matrix.data.MatrixBlock; @@ -324,11 +322,11 @@ public class Connection implements Closeable } /** - * Converts an input string representation of a matrix in textcell format + * Converts an input string representation of a matrix in csv or textcell format * into a dense double array. The meta data string is the SystemML generated * .mtd file including the number of rows and columns. * - * @param input string matrix in textcell format + * @param input string matrix in csv or textcell format * @param meta string representing SystemML matrix metadata in JSON format * @return matrix as a two-dimensional double array * @throws IOException if IOException occurs @@ -342,15 +340,10 @@ public class Connection implements Closeable int rows = jmtd.getInt(DataExpression.READROWPARAM); int cols = jmtd.getInt(DataExpression.READCOLPARAM); String format = jmtd.getString(DataExpression.FORMAT_TYPE); - - //sanity check input format - if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format) - ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) { - throw new IOException("Invalid input format (expected: text or mm): "+format); - } //parse the input matrix - return convertToDoubleMatrix(input, rows, cols); + InputStream is = IOUtilFunctions.toInputStream(input); + return convertToDoubleMatrix(is, rows, cols, format); } catch(Exception ex) { throw new IOException(ex); @@ -359,9 +352,7 @@ public class Connection implements Closeable /** * Converts an input string representation of a matrix in textcell format - * into a dense double array. The number of rows and columns need to be - * specified because textcell only represents non-zero values and hence - * does not define the dimensions in the general case. + * into a dense double array. * * @param input string matrix in textcell format * @param rows number of rows in the matrix @@ -378,9 +369,7 @@ public class Connection implements Closeable /** * Converts an input stream of a string matrix in textcell format - * into a dense double array. The number of rows and columns need to be - * specified because textcell only represents non-zero values and hence - * does not define the dimensions in the general case. + * into a dense double array. * * @param input InputStream to a string matrix in textcell format * @param rows number of rows in the matrix @@ -388,15 +377,40 @@ public class Connection implements Closeable * @return matrix as a two-dimensional double array * @throws IOException if IOException occurs */ - public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols) + public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols) throws IOException { + return convertToDoubleMatrix(input, rows, cols, DataExpression.FORMAT_TYPE_VALUE_TEXT); + } + + /** + * Converts an input stream of a string matrix in csv or textcell format + * into a dense double array. + * + * @param input InputStream to a string matrix in csv or textcell format + * @param rows number of rows in the matrix + * @param cols number of columns in the matrix + * @param format input format of the given stream + * @return matrix as a two-dimensional double array + * @throws IOException if IOException occurs + */ + public double[][] convertToDoubleMatrix(InputStream input, int rows, int cols, String format) throws IOException { double[][] ret = null; + + //sanity check input format + if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format) + ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format) + ||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format)) ) { + throw new IOException("Invalid input format (expected: csv, text or mm): "+format); + } try { //read input matrix - ReaderTextCell reader = (ReaderTextCell)MatrixReaderFactory.createMatrixReader(InputInfo.TextCellInputInfo); - MatrixBlock mb = reader.readMatrixFromInputStream(input, rows, cols, ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), (long)rows*cols); + InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? + InputInfo.CSVInputInfo : InputInfo.TextCellInputInfo; + MatrixReader reader = MatrixReaderFactory.createMatrixReader(iinfo); + MatrixBlock mb = reader.readMatrixFromInputStream(input, rows, cols, + ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), (long)rows*cols); //convert to double array ret = DataConverter.convertToDoubleMatrix( mb ); @@ -467,11 +481,11 @@ public class Connection implements Closeable } /** - * Converts an input string representation of a frame in textcell format + * Converts an input string representation of a frame in csv or textcell format * into a dense string array. The meta data string is the SystemML generated * .mtd file including the number of rows and columns. * - * @param input string frame in textcell format + * @param input string frame in csv or textcell format * @param meta string representing SystemML frame metadata in JSON format * @return frame as a two-dimensional string array * @throws IOException if IOException occurs @@ -485,15 +499,10 @@ public class Connection implements Closeable int rows = jmtd.getInt(DataExpression.READROWPARAM); int cols = jmtd.getInt(DataExpression.READCOLPARAM); String format = jmtd.getString(DataExpression.FORMAT_TYPE); - - //sanity check input format - if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format) - ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format))) { - throw new IOException("Invalid input format (expected: text or mm): "+format); - } //parse the input frame - return convertToStringFrame(input, rows, cols); + InputStream is = IOUtilFunctions.toInputStream(input); + return convertToStringFrame(is, rows, cols, format); } catch(Exception ex) { throw new IOException(ex); @@ -502,9 +511,7 @@ public class Connection implements Closeable /** * Converts an input string representation of a frame in textcell format - * into a dense string array. The number of rows and columns need to be - * specified because textcell only represents non-zero values and hence - * does not define the dimensions in the general case. + * into a dense string array. * * @param input string frame in textcell format * @param rows number of rows in the frame @@ -521,9 +528,7 @@ public class Connection implements Closeable /** * Converts an input stream of a string frame in textcell format - * into a dense string array. The number of rows and columns need to be - * specified because textcell only represents non-zero values and hence - * does not define the dimensions in the general case. + * into a dense string array. * * @param input InputStream to a string frame in textcell format * @param rows number of rows in the frame @@ -531,14 +536,38 @@ public class Connection implements Closeable * @return frame as a two-dimensional string array * @throws IOException if IOException occurs */ - public String[][] convertToStringFrame(InputStream input, int rows, int cols) + public String[][] convertToStringFrame(InputStream input, int rows, int cols) throws IOException { + return convertToStringFrame(input, rows, cols, DataExpression.FORMAT_TYPE_VALUE_TEXT); + } + + /** + * Converts an input stream of a string frame in csv or textcell format + * into a dense string array. + * + * @param input InputStream to a string frame in csv or textcell format + * @param rows number of rows in the frame + * @param cols number of columns in the frame + * @param format input format of the given stream + * @return frame as a two-dimensional string array + * @throws IOException if IOException occurs + */ + public String[][] convertToStringFrame(InputStream input, int rows, int cols, String format) throws IOException { String[][] ret = null; + + //sanity check input format + if(!(DataExpression.FORMAT_TYPE_VALUE_TEXT.equals(format) + ||DataExpression.FORMAT_TYPE_VALUE_MATRIXMARKET.equals(format) + ||DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format))) { + throw new IOException("Invalid input format (expected: csv, text or mm): "+format); + } try { - //read input matrix - FrameReaderTextCell reader = (FrameReaderTextCell)FrameReaderFactory.createFrameReader(InputInfo.TextCellInputInfo); + //read input frame + InputInfo iinfo = DataExpression.FORMAT_TYPE_VALUE_CSV.equals(format) ? + InputInfo.CSVInputInfo : InputInfo.TextCellInputInfo; + FrameReader reader = FrameReaderFactory.createFrameReader(iinfo); FrameBlock mb = reader.readFrameFromInputStream(input, rows, cols); //convert to double array @@ -551,7 +580,6 @@ public class Connection implements Closeable return ret; } - //////////////////////////////////////////// // Read transform meta data //////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java index 3aac76b..321735d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReader.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.LinkedList; import org.apache.hadoop.fs.FileStatus; @@ -42,7 +43,6 @@ import org.apache.sysml.runtime.util.UtilFunctions; */ public abstract class FrameReader { - public abstract FrameBlock readFrameFromHDFS( String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException; @@ -58,6 +58,21 @@ public abstract class FrameReader return readFrameFromHDFS(fname, getDefSchema(clen), getDefColNames(clen), rlen, clen); } + public abstract FrameBlock readFrameFromInputStream( InputStream is, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException; + + public FrameBlock readFrameFromInputStream( InputStream is, ValueType[] schema, long rlen, long clen ) + throws IOException, DMLRuntimeException + { + return readFrameFromInputStream(is, schema, getDefColNames(schema.length), rlen, clen); + } + + public FrameBlock readFrameFromInputStream( InputStream is, long rlen, long clen ) + throws IOException, DMLRuntimeException + { + return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen); + } + public ValueType[] getDefSchema( long clen ) throws IOException, DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java index a9df026..32feea3 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderBinaryBlock.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.io.InputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -38,7 +39,6 @@ import org.apache.sysml.runtime.matrix.data.FrameBlock; */ public class FrameReaderBinaryBlock extends FrameReader { - @Override public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException @@ -61,6 +61,13 @@ public class FrameReaderBinaryBlock extends FrameReader return ret; } + + @Override + public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) + throws IOException, DMLRuntimeException + { + throw new DMLRuntimeException("Not implemented yet."); + } protected void readBinaryBlockFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, long rlen, long clen ) throws IOException, DMLRuntimeException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index 707071f..e86cbe2 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.io.InputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,6 +32,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.InputFormat; import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -38,6 +40,7 @@ import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.transform.TfUtils; +import org.apache.sysml.runtime.util.InputStreamInputFormat; import org.apache.sysml.runtime.util.UtilFunctions; /** @@ -83,6 +86,24 @@ public class FrameReaderTextCSV extends FrameReader return ret; } + + @Override + public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, + long rlen, long clen) + throws IOException, DMLRuntimeException + { + //allocate output frame block + ValueType[] lschema = createOutputSchema(schema, clen); + String[] lnames = createOutputNames(names, clen); + FrameBlock ret = createOutputFrameBlock(lschema, lnames, rlen); + + //core read (sequential/parallel) + InputStreamInputFormat informat = new InputStreamInputFormat(is); + InputSplit split = informat.getSplits(null, 1)[0]; + readCSVFrameFromInputSplit(split, informat, null, ret, schema, names, rlen, clen, 0, true); + + return ret; + } protected void readCSVFrameFromHDFS( Path path, JobConf job, FileSystem fs, FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen) @@ -96,7 +117,7 @@ public class FrameReaderTextCSV extends FrameReader readCSVFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, 0, i==0); } - protected final void readCSVFrameFromInputSplit( InputSplit split, TextInputFormat informat, JobConf job, + protected final void readCSVFrameFromInputSplit( InputSplit split, InputFormat<LongWritable,Text> informat, JobConf job, FrameBlock dest, ValueType[] schema, String[] names, long rlen, long clen, int rl, boolean first) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java index e8be829..548452f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCell.java @@ -47,7 +47,6 @@ import org.apache.sysml.runtime.util.UtilFunctions; */ public class FrameReaderTextCell extends FrameReader { - @Override public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException @@ -71,11 +70,7 @@ public class FrameReaderTextCell extends FrameReader return ret; } - public final FrameBlock readFrameFromInputStream(InputStream is, long rlen, long clen) - throws IOException, DMLRuntimeException { - return readFrameFromInputStream(is, getDefSchema(clen), getDefColNames(clen), rlen, clen); - } - + @Override public final FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java index 75de6a8..ffe290e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java +++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java @@ -21,6 +21,7 @@ package org.apache.sysml.runtime.io; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -54,6 +55,9 @@ public abstract class MatrixReader public abstract MatrixBlock readMatrixFromHDFS( String fname, long rlen, long clen, int brlen, int bclen, long estnnz ) throws IOException, DMLRuntimeException; + public abstract MatrixBlock readMatrixFromInputStream( InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz ) + throws IOException, DMLRuntimeException; + public static Path[] getSequenceFilePaths( FileSystem fs, Path file ) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java index 015ae25..4c8549e 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; @@ -73,6 +74,13 @@ public class ReaderBinaryBlock extends MatrixReader return ret; } + + @Override + public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) + throws IOException, DMLRuntimeException + { + throw new DMLRuntimeException("Not implemented yet."); + } public ArrayList<IndexedMatrixValue> readIndexedMatrixBlocksFromHDFS(String fname, long rlen, long clen, int brlen, int bclen) throws IOException, DMLRuntimeException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java index f148ceb..dcf9e7b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.io.InputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,7 +35,6 @@ import org.apache.sysml.runtime.matrix.data.MatrixIndexes; public class ReaderBinaryCell extends MatrixReader { - @Override public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz) throws IOException, DMLRuntimeException @@ -60,6 +60,13 @@ public class ReaderBinaryCell extends MatrixReader return ret; } + @Override + public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) + throws IOException, DMLRuntimeException + { + throw new DMLRuntimeException("Not implemented yet."); + } + @SuppressWarnings("deprecation") private void readBinaryCellMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen ) throws IOException http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java index 9d8f368..6256955 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java @@ -21,12 +21,14 @@ package org.apache.sysml.runtime.io; import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,15 +43,12 @@ import org.apache.sysml.runtime.util.UtilFunctions; public class ReaderTextCSV extends MatrixReader { - private CSVFileFormatProperties _props = null; - public ReaderTextCSV(CSVFileFormatProperties props) - { + public ReaderTextCSV(CSVFileFormatProperties props) { _props = props; } - @Override public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long clen, int brlen, int bclen, long estnnz) throws IOException, DMLRuntimeException @@ -77,12 +76,31 @@ public class ReaderTextCSV extends MatrixReader return ret; } - + + @Override + public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) + throws IOException, DMLRuntimeException + { + //allocate output matrix block + MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false); + + //core read + long lnnz = readCSVMatrixFromInputStream(is, "external inputstream", ret, new MutableInt(0), rlen, clen, + brlen, bclen, _props.hasHeader(), _props.getDelim(), _props.isFill(), _props.getFillValue(), true); + + //finally check if change of sparse/dense block representation required + ret.setNonZeros( lnnz ); + ret.examSparsity(); + + return ret; + } + @SuppressWarnings("unchecked") private MatrixBlock readCSVMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue ) throws IOException { + //prepare file paths in alphanumeric order ArrayList<Path> files=new ArrayList<Path>(); if(fs.isDirectory(path)) { for(FileStatus stat: fs.listStatus(path, CSVReblockMR.hiddenFileFilter)) @@ -92,107 +110,118 @@ public class ReaderTextCSV extends MatrixReader else files.add(path); + //determine matrix size via additional pass if required if ( dest == null ) { dest = computeCSVSize(files, job, fs, hasHeader, delim, fill, fillValue); clen = dest.getNumColumns(); } - boolean sparse = dest.isInSparseFormat(); + //actual read of individual files + long lnnz = 0; + MutableInt row = new MutableInt(0); + for(int fileNo=0; fileNo<files.size(); fileNo++) { + lnnz += readCSVMatrixFromInputStream(fs.open(files.get(fileNo)), path.toString(), dest, + row, rlen, clen, brlen, bclen, hasHeader, delim, fill, fillValue, fileNo==0); + } + + //post processing + dest.setNonZeros( lnnz ); - ///////////////////////////////////////// + return dest; + } + + private long readCSVMatrixFromInputStream( InputStream is, String srcInfo, MatrixBlock dest, MutableInt rowPos, + long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue, boolean first ) + throws IOException + { + boolean sparse = dest.isInSparseFormat(); String value = null; - int row = 0; - int col = -1; + int row = rowPos.intValue(); double cellValue = 0; long lnnz = 0; - for(int fileNo=0; fileNo<files.size(); fileNo++) + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + if(first && hasHeader ) + br.readLine(); //ignore header + + // Read the data + boolean emptyValuesFound = false; + try { - BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(files.get(fileNo)))); - if(fileNo==0 && hasHeader ) - br.readLine(); //ignore header - - // Read the data - boolean emptyValuesFound = false; - try + if( sparse ) //SPARSE<-value { - if( sparse ) //SPARSE<-value + while( (value=br.readLine())!=null ) //foreach line { - while( (value=br.readLine())!=null ) //foreach line + String cellStr = value.toString().trim(); + emptyValuesFound = false; + String[] parts = IOUtilFunctions.split(cellStr, delim); + int col = 0; + + for(String part : parts) //foreach cell { - String cellStr = value.toString().trim(); - emptyValuesFound = false; - String[] parts = IOUtilFunctions.split(cellStr, delim); - col = 0; - - for(String part : parts) //foreach cell - { - part = part.trim(); - if ( part.isEmpty() ) { - emptyValuesFound = true; - cellValue = fillValue; - } - else { - cellValue = UtilFunctions.parseToDouble(part); - } - if ( cellValue != 0 ) { - dest.appendValue(row, col, cellValue); - lnnz++; - } - col++; + part = part.trim(); + if ( part.isEmpty() ) { + emptyValuesFound = true; + cellValue = fillValue; } - - //sanity checks for empty values and number of columns - IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen); - row++; + else { + cellValue = UtilFunctions.parseToDouble(part); + } + if ( cellValue != 0 ) { + dest.appendValue(row, col, cellValue); + lnnz++; + } + col++; } - } - else //DENSE<-value + + //sanity checks for empty values and number of columns + IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen); + row++; + } + } + else //DENSE<-value + { + while( (value=br.readLine())!=null ) //foreach line { - while( (value=br.readLine())!=null ) //foreach line + String cellStr = value.toString().trim(); + emptyValuesFound = false; + String[] parts = IOUtilFunctions.split(cellStr, delim); + int col = 0; + + for( String part : parts ) //foreach cell { - String cellStr = value.toString().trim(); - emptyValuesFound = false; - String[] parts = IOUtilFunctions.split(cellStr, delim); - col = 0; - - for( String part : parts ) //foreach cell - { - part = part.trim(); - if ( part.isEmpty() ) { - emptyValuesFound = true; - cellValue = fillValue; - } - else { - cellValue = UtilFunctions.parseToDouble(part); - } - if ( cellValue != 0 ) { - dest.setValueDenseUnsafe(row, col, cellValue); - lnnz++; - } - col++; + part = part.trim(); + if ( part.isEmpty() ) { + emptyValuesFound = true; + cellValue = fillValue; + } + else { + cellValue = UtilFunctions.parseToDouble(part); + } + if ( cellValue != 0 ) { + dest.setValueDenseUnsafe(row, col, cellValue); + lnnz++; } - - //sanity checks for empty values and number of columns - IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(path.toString(), cellStr, parts, clen); - row++; + col++; } + + //sanity checks for empty values and number of columns + IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, fill, emptyValuesFound); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(srcInfo, cellStr, parts, clen); + row++; } } - finally { - IOUtilFunctions.closeSilently(br); - } + } + finally { + IOUtilFunctions.closeSilently(br); } - //post processing - dest.setNonZeros( lnnz ); - - return dest; + rowPos.setValue(row); + return lnnz; } - private MatrixBlock computeCSVSize ( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) + private MatrixBlock computeCSVSize( List<Path> files, JobConf job, FileSystem fs, boolean hasHeader, String delim, boolean fill, double fillValue) throws IOException { int nrow = -1; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java index 15d4858..75b3bd9 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -111,6 +112,13 @@ public class ReaderTextCSVParallel extends MatrixReader return ret; } + @Override + public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) + throws IOException, DMLRuntimeException + { + throw new DMLRuntimeException("Not implemented yet."); + } + private void readCSVMatrixFromHDFS(InputSplit[] splits, Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean hasHeader, String delim, boolean fill, double fillValue) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java index 1c9cba5..3b93c33 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java @@ -79,6 +79,7 @@ public class ReaderTextCell extends MatrixReader return ret; } + @Override public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) throws IOException, DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java index f693455..9501b6d 100644 --- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java @@ -20,6 +20,7 @@ package org.apache.sysml.runtime.io; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -102,6 +103,13 @@ public class ReaderTextCellParallel extends MatrixReader return ret; } + @Override + public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, long clen, int brlen, int bclen, long estnnz) + throws IOException, DMLRuntimeException + { + throw new DMLRuntimeException("Not implemented yet."); + } + private void readTextCellMatrixFromHDFS( Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen, boolean matrixMarket ) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java new file mode 100644 index 0000000..83641f6 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/util/InputStreamInputFormat.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * Custom input format and record reader to redirect common implementation of csv read + * over record readers (which are required for the parallel readers) to an input stream. + * + */ +public class InputStreamInputFormat implements InputFormat<LongWritable, Text> +{ + private final InputStream _input; + + public InputStreamInputFormat(InputStream is) { + _input = is; + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + //return dummy handle - stream accessed purely over record reader + return new InputSplit[]{new FileSplit(null)}; + } + + @Override + public RecordReader<LongWritable,Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { + return new InputStreamRecordReader(_input); + } + + private static class InputStreamRecordReader implements RecordReader<LongWritable, Text> + { + private final BufferedReader _reader; + + public InputStreamRecordReader(InputStream is) { + _reader = new BufferedReader(new InputStreamReader( is )); + } + + @Override + public LongWritable createKey() { + return new LongWritable(); + } + @Override + public Text createValue() { + return new Text(); + } + @Override + public float getProgress() throws IOException { + return 0; + } + @Override + public long getPos() throws IOException { + return 0; + } + @Override + public boolean next(LongWritable key, Text value) throws IOException { + String line = _reader.readLine(); + if( line != null ) + value.set(line); + return (line != null); + } + @Override + public void close() throws IOException { + _reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java new file mode 100644 index 0000000..05ee456 --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/functions/jmlc/JMLCInputStreamReadTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.functions.jmlc; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Test; +import org.apache.sysml.api.jmlc.Connection; +import org.apache.sysml.parser.Expression.DataType; +import org.apache.sysml.runtime.io.FrameWriter; +import org.apache.sysml.runtime.io.FrameWriterFactory; +import org.apache.sysml.runtime.io.IOUtilFunctions; +import org.apache.sysml.runtime.io.MatrixWriter; +import org.apache.sysml.runtime.io.MatrixWriterFactory; +import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.OutputInfo; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.runtime.util.MapReduceTool; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +public class JMLCInputStreamReadTest extends AutomatedTestBase +{ + private final static String TEST_NAME = "jmlc"; + private final static String TEST_DIR = "functions/jmlc/"; + private final static String TEST_CLASS_DIR = TEST_DIR + JMLCInputStreamReadTest.class.getSimpleName() + "/"; + + private final static int rows = 700; + private final static int cols = 3; + + private final static double sparsity1 = 0.7; + private final static double sparsity2 = 0.1; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] { "R" }) ); + } + + @Test + public void testInputStreamReadMatrixDenseCSV() throws IOException { + runJMLCInputStreamReadTest(DataType.MATRIX, false, "csv", false); + } + + @Test + public void testInputStreamReadMatrixDenseText() throws IOException { + runJMLCInputStreamReadTest(DataType.MATRIX, false, "text", false); + } + + @Test + public void testInputStreamReadMatrixSparseCSV() throws IOException { + runJMLCInputStreamReadTest(DataType.MATRIX, true, "csv", false); + } + + @Test + public void testInputStreamReadMatrixSparseText() throws IOException { + runJMLCInputStreamReadTest(DataType.MATRIX, true, "text", false); + } + + @Test + public void testInputStreamReadFrameDenseCSV() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", false); + } + + @Test + public void testInputStreamReadFrameDenseText() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, false, "text", false); + } + + @Test + public void testInputStreamReadFrameSparseCSV() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", false); + } + + @Test + public void testInputStreamReadFrameSparseText() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, true, "text", false); + } + + @Test + public void testInputStreamReadFrameDenseCSVMeta() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, false, "csv", true); + } + + @Test + public void testInputStreamReadFrameDenseTextMeta() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, false, "text", true); + } + + @Test + public void testInputStreamReadFrameSparseCSVMeta() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, true, "csv", true); + } + + @Test + public void testInputStreamReadFrameSparseTextMeta() throws IOException { + runJMLCInputStreamReadTest(DataType.FRAME, true, "text", true); + } + + private void runJMLCInputStreamReadTest(DataType dt, boolean sparse, String format, boolean metaData ) + throws IOException + { + TestConfiguration config = getTestConfiguration(TEST_NAME); + loadTestConfiguration(config); + + //generate inputs + OutputInfo oinfo = format.equals("csv") ? OutputInfo.CSVOutputInfo : OutputInfo.TextCellOutputInfo; + double[][] data = TestUtils.round(getRandomMatrix(rows, cols, 0.51, 7.49, sparse?sparsity2:sparsity1, 7)); + + Connection conn = new Connection(); + + try + { + if( dt == DataType.MATRIX ) + { + //write input matrix + MatrixBlock mb = DataConverter.convertToMatrixBlock(data); + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(oinfo); + writer.writeMatrixToHDFS(mb, output("X"), rows, cols, -1, -1, -1); + + //read matrix from input stream + FileInputStream fis = new FileInputStream(output("X")); + double[][] data2 = conn.convertToDoubleMatrix(fis, rows, cols, format); + + //compare matrix result + TestUtils.compareMatrices(data, data2, rows, cols, 0); + } + else if( dt == DataType.FRAME ) + { + //write input frame + String[][] fdata = FrameTransformTest.createFrameData(data, "V"); + fdata[3][1] = "\"ab\"\"cdef\""; //test quoted tokens w/ inner quotes + if( format.equals("csv") ) + fdata[7][2] = "\"a,bc def\""; //test delimiter and space tokens + FrameBlock fb = DataConverter.convertToFrameBlock(fdata); + if( metaData ) { + fb.setColumnNames(IntStream.range(0,cols).mapToObj(i -> "CC"+i) + .collect(Collectors.toList()).toArray(new String[0])); + } + FrameWriter writer = FrameWriterFactory.createFrameWriter(oinfo); + writer.writeFrameToHDFS(fb, output("X"), rows, cols); + + //read frame from input stream + FileInputStream fis = new FileInputStream(output("X")); + String[][] fdata2 = conn.convertToStringFrame(fis, rows, cols, format); + + //compare frame result + TestUtils.compareFrames(fdata, fdata2, rows, cols); + } + else { + throw new IOException("Unsupported data type: "+dt.name()); + } + } + catch(Exception ex) { + throw new RuntimeException(ex); + } + finally { + MapReduceTool.deleteFileIfExistOnHDFS(output("X")); + IOUtilFunctions.closeSilently(conn); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/23709ec6/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java ---------------------------------------------------------------------- diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java index 9bf6a1c..9eb6af1 100644 --- a/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java +++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/jmlc/ZPackageSuite.java @@ -34,6 +34,7 @@ import org.junit.runners.Suite; FrameReadMetaTest.class, FrameTransformTest.class, JMLCInputOutputTest.class, + JMLCInputStreamReadTest.class, ReuseModelVariablesTest.class, SystemTMulticlassSVMScoreTest.class })
