Repository: incubator-systemml Updated Branches: refs/heads/master 76553ebf0 -> a39aecffa
[SYSTEMML-573] Fix meta data handling of csv frame readers/writers Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c7beb505 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c7beb505 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c7beb505 Branch: refs/heads/master Commit: c7beb5059ff765ba0bf4a3fa8afc7c19f95bf1b2 Parents: 76553eb Author: Matthias Boehm <[email protected]> Authored: Mon Jul 11 17:09:22 2016 -0700 Committer: Matthias Boehm <[email protected]> Committed: Tue Jul 12 11:31:53 2016 -0700 ---------------------------------------------------------------------- .../spark/utils/FrameRDDConverterUtils.java | 63 +++++++++++++++----- .../sysml/runtime/io/FrameReaderTextCSV.java | 21 ++++++- .../runtime/io/FrameReaderTextCSVParallel.java | 8 ++- .../sysml/runtime/io/FrameWriterTextCSV.java | 27 ++++++--- .../sysml/runtime/io/IOUtilFunctions.java | 5 ++ .../sysml/runtime/matrix/data/FrameBlock.java | 13 ++++ .../apache/sysml/runtime/transform/TfUtils.java | 7 ++- .../functions/frame/FrameMetaReadWriteTest.java | 19 +++--- 8 files changed, 124 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index c640d4d..c53dd34 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -53,6 +53,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; import org.apache.sysml.runtime.matrix.data.Pair; import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.UtilFunctions; @@ -85,8 +86,12 @@ public class FrameRDDConverterUtils if( !mcOut.dimsKnown(true) ) { JavaRDD<String> tmp = input.values() .map(new TextToStringFunction()); - long rlen = tmp.count() - (hasHeader ? 1 : 0); - long clen = tmp.first().split(delim).length; + String tmpStr = tmp.first(); + boolean metaHeader = tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX) + || tmpStr.startsWith(TfUtils.TXMTD_NDPREFIX); + tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr; + long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0); + long clen = tmpStr.split(delim).length; mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), mcOut.getColsPerBlock(), -1); } @@ -396,6 +401,8 @@ public class FrameRDDConverterUtils private boolean _fill = false; private int _maxRowsPerBlock = -1; private List<String> _colnames = null; + private List<String> _mvMeta = null; //missing value meta data + private List<String> _ndMeta = null; //num distinct meta data public CSVToBinaryBlockFunction(MatrixCharacteristics mc, boolean hasHeader, String delim, boolean fill) { _clen = mc.getCols(); @@ -420,13 +427,22 @@ public class FrameRDDConverterUtils Tuple2<Text,Long> tmp = arg0.next(); String row = tmp._1().toString(); long rowix = tmp._2(); - if(!_hasHeader) // In case there is no header, rowindex to be adjusted to base 1. - rowix++; if(_hasHeader && rowix == 0) { //Skip header _colnames = Arrays.asList(row.split(_delim)); continue; } - + if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) { + _mvMeta = Arrays.asList(Arrays.copyOfRange(row.split(_delim), 1, (int)_clen+1)); + continue; + } + else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) { + _ndMeta = Arrays.asList(Arrays.copyOfRange(row.split(_delim), 1, (int)_clen+1)); + continue; + } + + //adjust row index for header and meta data + rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == null) ? 0 : 2); + if( iRowsInBlock == 0 || iRowsInBlock == _maxRowsPerBlock) { if( iRowsInBlock == _maxRowsPerBlock ) flushBlocksToList(ix, mb, ret); @@ -458,6 +474,12 @@ public class FrameRDDConverterUtils mb[0] = new FrameBlock((int)_clen, ValueType.STRING); if( _colnames != null ) mb[0].setColumnNames(_colnames); + if( _mvMeta != null ) + for( int j=0; j<_clen; j++ ) + mb[0].getColumnMetadata(j).setMvValue(_mvMeta.get(j)); + if( _ndMeta != null ) + for( int j=0; j<_clen; j++ ) + mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j))); } // Flushes current state of filled column blocks to output list. @@ -493,20 +515,33 @@ public class FrameRDDConverterUtils FrameBlock blk = arg0._2(); ArrayList<String> ret = new ArrayList<String>(); + StringBuilder sb = new StringBuilder(); - //handle header information - if(_props.hasHeader() && ix==1 ) { - StringBuilder sb = new StringBuilder(); - for(int j = 1; j <= blk.getNumColumns(); j++) { - if(j != 1) - sb.append(_props.getDelim()); - sb.append("C" + j); + //handle header information and frame meta data + if( ix==1 ) { + if( _props.hasHeader() ) { + for(int j = 1; j <= blk.getNumColumns(); j++) { + sb.append(blk.getColumnNames().get(j) + + ((j<blk.getNumColumns()-1)?_props.getDelim():"")); + } + ret.add(sb.toString()); + sb.setLength(0); //reset + } + if( !blk.isColumnMetadataDefault() ) { + sb.append(TfUtils.TXMTD_MVPREFIX + _props.getDelim()); + for( int j=0; j<blk.getNumColumns(); j++ ) + sb.append(blk.getColumnMetadata(j).getMvValue() + ((j<blk.getNumColumns()-1)?_props.getDelim():"")); + ret.add(sb.toString()); + sb.setLength(0); //reset + sb.append(TfUtils.TXMTD_NDPREFIX + _props.getDelim()); + for( int j=0; j<blk.getNumColumns(); j++ ) + sb.append(blk.getColumnMetadata(j).getNumDistinct() + ((j<blk.getNumColumns()-1)?_props.getDelim():"")); + ret.add(sb.toString()); + sb.setLength(0); //reset } - ret.add(sb.toString()); } //handle Frame block data - StringBuilder sb = new StringBuilder(); Iterator<String[]> iter = blk.getStringRowIterator(); while( iter.hasNext() ) { String[] row = iter.next(); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java index b5a5756..bf3d79f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java @@ -39,6 +39,7 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.UtilFunctions; /** @@ -172,6 +173,17 @@ public class FrameReaderTextCSV extends FrameReader emptyValuesFound = false; col = 0; String[] parts = IOUtilFunctions.split(cellStr, delim); + //parse frame meta data (missing values / num distinct) + if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) || parts[0].equals(TfUtils.TXMTD_NDPREFIX) ) { + if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) ) + for( int j=0; j<dest.getNumColumns(); j++ ) + dest.getColumnMetadata(j).setMvValue(parts[j+1]); + else if( parts[0].equals(TfUtils.TXMTD_NDPREFIX) ) + for( int j=0; j<dest.getNumColumns(); j++ ) + dest.getColumnMetadata(j).setNumDistinct(Long.parseLong(parts[j+1])); + continue; + } + for( String part : parts ) //foreach cell { part = part.trim(); @@ -233,9 +245,12 @@ public class FrameReaderTextCSV extends FrameReader if( i==0 && _props.hasHeader() ) reader.next(key, value); - //count remaining number of rows - while ( reader.next(key, value) ) - nrow++; + //count remaining number of rows, ignore meta data + while ( reader.next(key, value) ) { + String val = value.toString(); + nrow += ( val.startsWith(TfUtils.TXMTD_MVPREFIX) + || val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; + } } finally { IOUtilFunctions.closeSilently(reader); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java index da71905..2713231 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java @@ -41,6 +41,7 @@ import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.Pair; +import org.apache.sysml.runtime.transform.TfUtils; /** * Multi-threaded frame text csv reader. @@ -175,8 +176,11 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV try { if ( _firstSplit && _hasHeader ) reader.next(key, value); - while (reader.next(key, value)) - nrows++; + while ( reader.next(key, value) ) { + String val = value.toString(); + nrows += ( val.startsWith(TfUtils.TXMTD_MVPREFIX) + || val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; + } } finally { IOUtilFunctions.closeSilently(reader); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java index addf798..274319f 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java @@ -31,6 +31,7 @@ import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties; import org.apache.sysml.runtime.matrix.data.FrameBlock; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.MapReduceTool; /** @@ -125,19 +126,27 @@ public class FrameWriterTextCSV extends FrameWriter String delim = props.getDelim(); // Write header line, if needed - if( props.hasHeader() && rl==0 ) - { - //write row chunk-wise to prevent OOM on large number of columns - for( int bj=0; bj<cols; bj+=BLOCKSIZE_J ) { - for( int j=bj; j < Math.min(cols,bj+BLOCKSIZE_J); j++) { - sb.append("C"+ (j+1)); + if( rl==0 ) { + //append column names if header requested + if( props.hasHeader() ) { + for( int j=0; j<cols; j++ ) { + sb.append(src.getColumnNames().get(j)); if ( j < cols-1 ) sb.append(delim); } - br.write( sb.toString() ); - sb.setLength(0); + sb.append('\n'); + } + //append meta data + if( !src.isColumnMetadataDefault() ) { + sb.append(TfUtils.TXMTD_MVPREFIX + delim); + for( int j=0; j<cols; j++ ) + sb.append(src.getColumnMetadata(j).getMvValue() + ((j<cols-1)?delim:"")); + sb.append("\n"); + sb.append(TfUtils.TXMTD_NDPREFIX + delim); + for( int j=0; j<cols; j++ ) + sb.append(src.getColumnMetadata(j).getNumDistinct() + ((j<cols-1)?delim:"")); + sb.append("\n"); } - sb.append('\n'); br.write( sb.toString() ); sb.setLength(0); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 1b80f90..275cfe4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.sysml.runtime.transform.TfUtils; import org.apache.sysml.runtime.util.LocalFileUtils; import org.apache.sysml.runtime.util.UtilFunctions; @@ -212,6 +213,10 @@ public class IOUtilFunctions try { if( reader.next(key, value) ) { String row = value.toString().trim(); + if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) + reader.next(key, value); + if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) + reader.next(key, value); if( !row.isEmpty() ) ncol = StringUtils.countMatches(row, delim) + 1; } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java index 051ce58..64bc6fe 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java @@ -93,6 +93,8 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable this(); _schema.addAll(Collections.nCopies(ncols, vt)); _colnames = createColNames(ncols); + for( int j=0; j<ncols; j++ ) + _colmeta.add(new ColumnMetadata(0)); } public FrameBlock(List<ValueType> schema) { @@ -195,6 +197,17 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable /** * + * @return + */ + public boolean isColumnMetadataDefault() { + boolean ret = true; + for( int j=0; j<getNumColumns() && ret; j++ ) + ret &= isColumnMetadataDefault(j); + return ret; + } + + /** + * * @param c * @return */ http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java index 2b63797..dd18b43 100644 --- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java +++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.wink.json4j.JSONException; import org.apache.wink.json4j.JSONObject; import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.lops.Lop; import org.apache.sysml.parser.DataExpression; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -83,7 +84,11 @@ public class TfUtils implements Serializable{ public static final String TXMETHOD_OMIT = "omit"; public static final String TXMETHOD_MVRCD = "mvrcd"; - //transform meta data constants + //transform meta data constants (frame-based transform) + public static final String TXMTD_MVPREFIX = "#Meta"+Lop.DATATYPE_PREFIX+"MV"; + public static final String TXMTD_NDPREFIX = "#Meta"+Lop.DATATYPE_PREFIX+"ND"; + + //transform meta data constants (old file-based transform) public static final String TXMTD_SEP = ","; public static final String TXMTD_COLTYPES = "coltypes.csv"; public static final String TXMTD_COLNAMES = "column.names"; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java index 3803015..5066582 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java @@ -70,16 +70,15 @@ public class FrameMetaReadWriteTest extends AutomatedTestBase runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, ExecType.SPARK); } -// TODO: add meta data support for text formats (requires consolidation with file-based transform first) -// @Test -// public void testFrameCsvCP() { -// runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.CP); -// } -// -// @Test -// public void testFrameCsvSpark() { -// runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.SPARK); -// } + @Test + public void testFrameCsvCP() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.CP); + } + + @Test + public void testFrameCsvSpark() { + runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.SPARK); + } /** *
