Repository: incubator-systemml
Updated Branches:
  refs/heads/master 76553ebf0 -> a39aecffa


[SYSTEMML-573] Fix meta data handling of csv frame readers/writers

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/c7beb505
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/c7beb505
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/c7beb505

Branch: refs/heads/master
Commit: c7beb5059ff765ba0bf4a3fa8afc7c19f95bf1b2
Parents: 76553eb
Author: Matthias Boehm <[email protected]>
Authored: Mon Jul 11 17:09:22 2016 -0700
Committer: Matthias Boehm <[email protected]>
Committed: Tue Jul 12 11:31:53 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/FrameRDDConverterUtils.java     | 63 +++++++++++++++-----
 .../sysml/runtime/io/FrameReaderTextCSV.java    | 21 ++++++-
 .../runtime/io/FrameReaderTextCSVParallel.java  |  8 ++-
 .../sysml/runtime/io/FrameWriterTextCSV.java    | 27 ++++++---
 .../sysml/runtime/io/IOUtilFunctions.java       |  5 ++
 .../sysml/runtime/matrix/data/FrameBlock.java   | 13 ++++
 .../apache/sysml/runtime/transform/TfUtils.java |  7 ++-
 .../functions/frame/FrameMetaReadWriteTest.java | 19 +++---
 8 files changed, 124 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index c640d4d..c53dd34 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -53,6 +53,7 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.matrix.mapred.FrameReblockBuffer;
+import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -85,8 +86,12 @@ public class FrameRDDConverterUtils
                if( !mcOut.dimsKnown(true) ) {
                        JavaRDD<String> tmp = input.values()
                                        .map(new TextToStringFunction());
-                       long rlen = tmp.count() - (hasHeader ? 1 : 0);
-                       long clen = tmp.first().split(delim).length;
+                       String tmpStr = tmp.first();
+                       boolean metaHeader = 
tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX) 
+                                       || 
tmpStr.startsWith(TfUtils.TXMTD_NDPREFIX);
+                       tmpStr = (metaHeader) ? 
tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr;
+                       long rlen = tmp.count() - (hasHeader ? 1 : 0) - 
(metaHeader ? 2 : 0);
+                       long clen = tmpStr.split(delim).length;
                        mcOut.set(rlen, clen, mcOut.getRowsPerBlock(), 
mcOut.getColsPerBlock(), -1);
                }
                
@@ -396,6 +401,8 @@ public class FrameRDDConverterUtils
                private boolean _fill = false;
                private int _maxRowsPerBlock = -1; 
                private List<String> _colnames = null;
+               private List<String> _mvMeta = null; //missing value meta data
+               private List<String> _ndMeta = null; //num distinct meta data
                
                public CSVToBinaryBlockFunction(MatrixCharacteristics mc, 
boolean hasHeader, String delim, boolean fill) {
                        _clen = mc.getCols();
@@ -420,13 +427,22 @@ public class FrameRDDConverterUtils
                                Tuple2<Text,Long> tmp = arg0.next();
                                String row = tmp._1().toString();
                                long rowix = tmp._2();
-                               if(!_hasHeader) // In case there is no header, 
rowindex to be adjusted to base 1.
-                                       rowix++;
                                if(_hasHeader && rowix == 0) { //Skip header
                                        _colnames = 
Arrays.asList(row.split(_delim));
                                        continue;
                                }
-                       
+                               if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) {
+                                       _mvMeta = 
Arrays.asList(Arrays.copyOfRange(row.split(_delim), 1, (int)_clen+1));
+                                       continue;
+                               }
+                               else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) 
) {
+                                       _ndMeta = 
Arrays.asList(Arrays.copyOfRange(row.split(_delim), 1, (int)_clen+1));
+                                       continue;
+                               }
+                               
+                               //adjust row index for header and meta data
+                               rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == 
null) ? 0 : 2);
+                               
                                if( iRowsInBlock == 0 || iRowsInBlock == 
_maxRowsPerBlock) {
                                        if( iRowsInBlock == _maxRowsPerBlock )
                                                flushBlocksToList(ix, mb, ret);
@@ -458,6 +474,12 @@ public class FrameRDDConverterUtils
                        mb[0] = new FrameBlock((int)_clen, ValueType.STRING);
                        if( _colnames != null )
                                mb[0].setColumnNames(_colnames);
+                       if( _mvMeta != null )
+                               for( int j=0; j<_clen; j++ )
+                                       
mb[0].getColumnMetadata(j).setMvValue(_mvMeta.get(j));
+                       if( _ndMeta != null )
+                               for( int j=0; j<_clen; j++ )
+                                       
mb[0].getColumnMetadata(j).setNumDistinct(Long.parseLong(_ndMeta.get(j)));
                }
                
                // Flushes current state of filled column blocks to output list.
@@ -493,20 +515,33 @@ public class FrameRDDConverterUtils
                        FrameBlock blk = arg0._2();
                        
                        ArrayList<String> ret = new ArrayList<String>();
+                       StringBuilder sb = new StringBuilder();
                        
-                       //handle header information
-                       if(_props.hasHeader() && ix==1 ) {
-                               StringBuilder sb = new StringBuilder();
-                               for(int j = 1; j <= blk.getNumColumns(); j++) {
-                                       if(j != 1)
-                                               sb.append(_props.getDelim());
-                                       sb.append("C" + j);
+                       //handle header information and frame meta data
+                       if( ix==1 ) {
+                               if( _props.hasHeader() ) {
+                                       for(int j = 1; j <= 
blk.getNumColumns(); j++) {
+                                               
sb.append(blk.getColumnNames().get(j) 
+                                                       + 
((j<blk.getNumColumns()-1)?_props.getDelim():""));
+                                       }
+                                       ret.add(sb.toString());
+                                       sb.setLength(0); //reset
+                               }
+                               if( !blk.isColumnMetadataDefault() ) {
+                                       sb.append(TfUtils.TXMTD_MVPREFIX + 
_props.getDelim());
+                                       for( int j=0; j<blk.getNumColumns(); 
j++ )
+                                               
sb.append(blk.getColumnMetadata(j).getMvValue() + 
((j<blk.getNumColumns()-1)?_props.getDelim():""));
+                                       ret.add(sb.toString());
+                                       sb.setLength(0); //reset
+                                       sb.append(TfUtils.TXMTD_NDPREFIX + 
_props.getDelim());
+                                       for( int j=0; j<blk.getNumColumns(); 
j++ )
+                                               
sb.append(blk.getColumnMetadata(j).getNumDistinct() + 
((j<blk.getNumColumns()-1)?_props.getDelim():""));
+                                       ret.add(sb.toString());
+                                       sb.setLength(0); //reset                
                                }
-                               ret.add(sb.toString());
                        }
                
                        //handle Frame block data
-                       StringBuilder sb = new StringBuilder();
                        Iterator<String[]> iter = blk.getStringRowIterator();
                        while( iter.hasNext() ) {
                                String[] row = iter.next();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
index b5a5756..bf3d79f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSV.java
@@ -39,6 +39,7 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
 /**
@@ -172,6 +173,17 @@ public class FrameReaderTextCSV extends FrameReader
                                emptyValuesFound = false; col = 0;
                                String[] parts = IOUtilFunctions.split(cellStr, 
delim);
                                
+                               //parse frame meta data (missing values / num 
distinct)
+                               if( parts[0].equals(TfUtils.TXMTD_MVPREFIX) || 
parts[0].equals(TfUtils.TXMTD_NDPREFIX) ) {
+                                       if( 
parts[0].equals(TfUtils.TXMTD_MVPREFIX) )
+                                               for( int j=0; 
j<dest.getNumColumns(); j++ )
+                                                       
dest.getColumnMetadata(j).setMvValue(parts[j+1]);
+                                       else if( 
parts[0].equals(TfUtils.TXMTD_NDPREFIX) )
+                                               for( int j=0; 
j<dest.getNumColumns(); j++ )
+                                                       
dest.getColumnMetadata(j).setNumDistinct(Long.parseLong(parts[j+1]));
+                                       continue;
+                               }
+                               
                                for( String part : parts ) //foreach cell
                                {
                                        part = part.trim();
@@ -233,9 +245,12 @@ public class FrameReaderTextCSV extends FrameReader
                                if( i==0 && _props.hasHeader() )
                                        reader.next(key, value);
                                
-                               //count remaining number of rows
-                               while ( reader.next(key, value) )
-                                       nrow++;
+                               //count remaining number of rows, ignore meta 
data
+                               while ( reader.next(key, value) ) {
+                                       String val = value.toString();
+                                       nrow += ( 
val.startsWith(TfUtils.TXMTD_MVPREFIX)
+                                               || 
val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
+                               }
                        }
                        finally {
                                IOUtilFunctions.closeSilently(reader);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
index da71905..2713231 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameReaderTextCSVParallel.java
@@ -41,6 +41,7 @@ import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
+import org.apache.sysml.runtime.transform.TfUtils;
 
 /**
  * Multi-threaded frame text csv reader.
@@ -175,8 +176,11 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                        try {
                                if ( _firstSplit && _hasHeader )
                                        reader.next(key, value);
-                               while (reader.next(key, value))
-                                       nrows++;
+                               while ( reader.next(key, value) ) {
+                                       String val = value.toString();
+                                       nrows += ( 
val.startsWith(TfUtils.TXMTD_MVPREFIX)
+                                               || 
val.startsWith(TfUtils.TXMTD_NDPREFIX)) ? 0 : 1; 
+                               }
                        } 
                        finally {
                                IOUtilFunctions.closeSilently(reader);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java 
b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
index addf798..274319f 100644
--- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java
@@ -31,6 +31,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.CSVFileFormatProperties;
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -125,19 +126,27 @@ public class FrameWriterTextCSV extends FrameWriter
                        String delim = props.getDelim();
                        
                        // Write header line, if needed
-                       if( props.hasHeader() && rl==0 ) 
-                       {
-                               //write row chunk-wise to prevent OOM on large 
number of columns
-                               for( int bj=0; bj<cols; bj+=BLOCKSIZE_J ) {
-                                       for( int j=bj; j < 
Math.min(cols,bj+BLOCKSIZE_J); j++) {
-                                               sb.append("C"+ (j+1));
+                       if( rl==0 ) {
+                               //append column names if header requested
+                               if( props.hasHeader() ) {
+                                       for( int j=0; j<cols; j++ ) {
+                                               
sb.append(src.getColumnNames().get(j));
                                                if ( j < cols-1 )
                                                        sb.append(delim);
                                        }
-                                       br.write( sb.toString() );
-                           sb.setLength(0);    
+                                       sb.append('\n');
+                               }
+                               //append meta data
+                               if( !src.isColumnMetadataDefault() ) {
+                                       sb.append(TfUtils.TXMTD_MVPREFIX + 
delim);
+                                       for( int j=0; j<cols; j++ )
+                                               
sb.append(src.getColumnMetadata(j).getMvValue() + ((j<cols-1)?delim:""));
+                                       sb.append("\n");
+                                       sb.append(TfUtils.TXMTD_NDPREFIX + 
delim);
+                                       for( int j=0; j<cols; j++ )
+                                               
sb.append(src.getColumnMetadata(j).getNumDistinct() + ((j<cols-1)?delim:""));
+                                       sb.append("\n");
                                }
-                               sb.append('\n');
                                br.write( sb.toString() );
                    sb.setLength(0);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 1b80f90..275cfe4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.sysml.runtime.transform.TfUtils;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
@@ -212,6 +213,10 @@ public class IOUtilFunctions
                        try {
                                if( reader.next(key, value) ) {
                                        String row = value.toString().trim();
+                                       if( 
row.startsWith(TfUtils.TXMTD_MVPREFIX) )
+                                               reader.next(key, value);
+                                       if( 
row.startsWith(TfUtils.TXMTD_NDPREFIX) )
+                                               reader.next(key, value);
                                        if( !row.isEmpty() )
                                                ncol = 
StringUtils.countMatches(row, delim) + 1;
                                }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 051ce58..64bc6fe 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -93,6 +93,8 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
                this();
                _schema.addAll(Collections.nCopies(ncols, vt));
                _colnames = createColNames(ncols);
+               for( int j=0; j<ncols; j++ )
+                       _colmeta.add(new ColumnMetadata(0));
        }
        
        public FrameBlock(List<ValueType> schema) {
@@ -195,6 +197,17 @@ public class FrameBlock implements Writable, CacheBlock, 
Externalizable
        
        /**
         * 
+        * @return
+        */
+       public boolean isColumnMetadataDefault() {
+               boolean ret = true;
+               for( int j=0; j<getNumColumns() && ret; j++ )
+                       ret &= isColumnMetadataDefault(j);
+               return ret;
+       }
+       
+       /**
+        * 
         * @param c
         * @return
         */

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java 
b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
index 2b63797..dd18b43 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/TfUtils.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 import org.apache.sysml.conf.ConfigurationManager;
+import org.apache.sysml.lops.Lop;
 import org.apache.sysml.parser.DataExpression;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -83,7 +84,11 @@ public class TfUtils implements Serializable{
        public static final String TXMETHOD_OMIT      = "omit";
        public static final String TXMETHOD_MVRCD     = "mvrcd";
                
-       //transform meta data constants
+       //transform meta data constants (frame-based transform)
+       public static final String TXMTD_MVPREFIX = 
"#Meta"+Lop.DATATYPE_PREFIX+"MV";
+       public static final String TXMTD_NDPREFIX = 
"#Meta"+Lop.DATATYPE_PREFIX+"ND";
+       
+       //transform meta data constants (old file-based transform)
        public static final String TXMTD_SEP         = ",";
        public static final String TXMTD_COLTYPES    = "coltypes.csv";  
        public static final String TXMTD_COLNAMES    = "column.names";

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/c7beb505/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
index 3803015..5066582 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameMetaReadWriteTest.java
@@ -70,16 +70,15 @@ public class FrameMetaReadWriteTest extends 
AutomatedTestBase
                runFrameReadWriteTest(OutputInfo.TextCellOutputInfo, 
ExecType.SPARK);
        }
        
-// TODO: add meta data support for text formats (requires consolidation with 
file-based transform first)       
-//     @Test
-//     public void testFrameCsvCP()  {
-//             runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.CP);
-//     }
-//
-//     @Test
-//     public void testFrameCsvSpark()  {
-//             runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.SPARK);
-//     }
+       @Test
+       public void testFrameCsvCP()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.CP);
+       }
+
+       @Test
+       public void testFrameCsvSpark()  {
+               runFrameReadWriteTest(OutputInfo.CSVOutputInfo, ExecType.SPARK);
+       }
        
        /**
         * 

Reply via email to