Repository: incubator-systemml
Updated Branches:
  refs/heads/master 02a9f2770 -> 1be911cc5


[SYSTEMML-568] Fix various issues w/ mlcontext frame support

The commit 02a9f277000bd144c729311dac6c04bcb520180f introduces a couple
of major and minor issues. This patch resolves resolves the following:

(1) Removed spark references from general-purpose util functions. These
references would create classnotfound exceptions in non-spark
environments.

(2) Missing delimiter handling (other than comma) in custom csv frame
conversions. 

(3) Logging instead of prints to stdout which are inconsistent with the
rest of the system and would make debugging much harder.

(4) Removed redundant code for handling of scratch space and blocksize
configuration from mlcontext utils.

(5) Removed unnecessary rdd operations from mlcontext result handling.

(6) Fix handling of csv properties in mlcontext csv frame conversion
(which only used hard-coded defaults so far) as well as handling of
quoted strings.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1be911cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1be911cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1be911cc

Branch: refs/heads/master
Commit: 1be911cc52b4217308faa074d2b0ac4d9b86c540
Parents: 02a9f27
Author: Matthias Boehm <[email protected]>
Authored: Mon Aug 29 17:41:25 2016 +0200
Committer: Matthias Boehm <[email protected]>
Committed: Mon Aug 29 17:41:25 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    | 21 ++---
 .../java/org/apache/sysml/api/MLOutput.java     | 16 +---
 .../sysml/api/mlcontext/MLContextUtil.java      |  8 +-
 .../spark/utils/FrameRDDConverterUtils.java     | 73 ++++++++++++++++-
 .../sysml/runtime/util/UtilFunctions.java       | 84 --------------------
 .../functions/frame/FrameConverterTest.java     |  4 +-
 .../functions/mlcontext/FrameTest.java          |  8 +-
 7 files changed, 96 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index 8f6e95f..297d3a5 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -524,16 +524,16 @@ public class MLContext {
                
                MatrixObject mo;
                if( format.equals("csv") ) {
-                       //TODO replace default block size
-                       MatrixCharacteristics mc = new 
MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, 
OptimizerUtils.DEFAULT_BLOCKSIZE, nnz);
+                       int blksz = ConfigurationManager.getBlocksize();
+                       MatrixCharacteristics mc = new 
MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
                        mo = new MatrixObject(ValueType.DOUBLE, null, new 
MatrixFormatMetaData(mc, OutputInfo.CSVOutputInfo, InputInfo.CSVInputInfo));
                }
                else if( format.equals("text") ) {
                        if(rlen == -1 || clen == -1) {
                                throw new DMLRuntimeException("The metadata is 
required in registerInput for format:" + format);
                        }
-                       //TODO replace default block size
-                       MatrixCharacteristics mc = new 
MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, 
OptimizerUtils.DEFAULT_BLOCKSIZE, nnz);
+                       int blksz = ConfigurationManager.getBlocksize();
+                       MatrixCharacteristics mc = new 
MatrixCharacteristics(rlen, clen, blksz, blksz, nnz);
                        mo = new MatrixObject(ValueType.DOUBLE, null, new 
MatrixFormatMetaData(mc, OutputInfo.TextCellOutputInfo, 
InputInfo.TextCellInputInfo));
                }
                else if( format.equals("mm") ) {
@@ -583,21 +583,24 @@ public class MLContext {
                if(_inVarnames == null)
                        _inVarnames = new ArrayList<String>();
                
+               //FIXME: MB why does the register input for frames implicitly 
convert the data to binary block,
+               //while the register input for matrices does not? FIXME
+               
                JavaPairRDD<LongWritable, Text> rddText = rddIn.mapToPair(new 
ConvertStringToLongTextPair());
                
-               MatrixCharacteristics mc = new MatrixCharacteristics(rlen, 
clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz);
+               int blksz = ConfigurationManager.getBlocksize();
+               MatrixCharacteristics mc = new MatrixCharacteristics(rlen, 
clen, blksz, blksz, nnz);
                FrameObject fo = new FrameObject(null, new 
MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, 
InputInfo.BinaryBlockInputInfo));
                JavaPairRDD<Long, FrameBlock> rdd = null; 
                if( format.equals("csv") ) {
-                       //TODO replace default block size
-                       
-                       rdd = FrameRDDConverterUtils.csvToBinaryBlock(new 
JavaSparkContext(getSparkContext()), rddText, mc, false, ",", false, -1);
+                       CSVFileFormatProperties csvprops = (props!=null) ? 
(CSVFileFormatProperties)props: new CSVFileFormatProperties();
+                       rdd = FrameRDDConverterUtils.csvToBinaryBlock(new 
JavaSparkContext(getSparkContext()), 
+                                       rddText, mc, csvprops.hasHeader(), 
csvprops.getDelim(), csvprops.isFill(), csvprops.getFillValue());
                }
                else if( format.equals("text") ) {
                        if(rlen == -1 || clen == -1) {
                                throw new DMLRuntimeException("The metadata is 
required in registerInput for format:" + format);
                        }
-                       //TODO replace default block size
                        rdd = FrameRDDConverterUtils.textCellToBinaryBlock(new 
JavaSparkContext(getSparkContext()), rddText, mc, schema);
                }
                else {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java 
b/src/main/java/org/apache/sysml/api/MLOutput.java
index 916a652..a18ab60 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -79,25 +79,17 @@ public class MLOutput {
        @SuppressWarnings("unchecked")
        public JavaPairRDD<MatrixIndexes,MatrixBlock> 
getBinaryBlockedRDD(String varName) throws DMLRuntimeException {
                if(_outputs.containsKey(varName)) {
-                       JavaPairRDD<?,?> tmp = _outputs.get(varName);
-                       if (tmp.first()._2() instanceof MatrixBlock)
-                               return 
(JavaPairRDD<MatrixIndexes,MatrixBlock>)tmp;
-                       else
-                               return null;
+                       return (JavaPairRDD<MatrixIndexes,MatrixBlock>) 
_outputs.get(varName);
                }
-               throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
+               throw new DMLRuntimeException("Variable " + varName + " not 
found in the outputs.");
        }
        
        @SuppressWarnings("unchecked")
        public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockedRDD(String 
varName) throws DMLRuntimeException {
                if(_outputs.containsKey(varName)) {
-                       JavaPairRDD<?,?> tmp = _outputs.get(varName);
-                       if (tmp.first()._2() instanceof FrameBlock)
-                               return (JavaPairRDD<Long,FrameBlock>)tmp;
-                       else
-                               return null;
+                       return 
(JavaPairRDD<Long,FrameBlock>)_outputs.get(varName);
                }
-               throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
+               throw new DMLRuntimeException("Variable " + varName + " not 
found in the outputs.");
        }
        
        public MatrixCharacteristics getMatrixCharacteristics(String varName) 
throws DMLRuntimeException {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
index 120df32..6da6468 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java
@@ -515,9 +515,7 @@ public final class MLContextUtil {
         * @return the default matrix block size
         */
        public static int defaultBlockSize() {
-               DMLConfig conf = ConfigurationManager.getDMLConfig();
-               int blockSize = conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE);
-               return blockSize;
+               return ConfigurationManager.getBlocksize();
        }
 
        /**
@@ -526,9 +524,7 @@ public final class MLContextUtil {
         * @return the lcoation of the scratch space directory
         */
        public static String scratchSpace() {
-               DMLConfig conf = ConfigurationManager.getDMLConfig();
-               String scratchSpace = 
conf.getTextValue(DMLConfig.SCRATCH_SPACE);
-               return scratchSpace;
+               return ConfigurationManager.getScratchSpace(); 
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index 3bd163c..fa8c48f 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -27,6 +27,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -69,6 +71,8 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 
 public class FrameRDDConverterUtils 
 {
+       private static final Log LOG = 
LogFactory.getLog(FrameRDDConverterUtils.class.getName());
+       
        //=====================================
        // CSV <--> Binary block
 
@@ -363,12 +367,79 @@ public class FrameRDDConverterUtils
                JavaRDD<Row> rowRDD = in.flatMap(new 
BinaryBlockToDataFrameFunction());
                                
                SQLContext sqlContext = new SQLContext(sc);
-               StructType dfSchema = 
UtilFunctions.convertFrameSchemaToDFSchema(schema);
+               StructType dfSchema = convertFrameSchemaToDFSchema(schema);
                DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema);
        
                return df;
        }
        
+       
+       /*
+        * This function will convert Frame schema into DataFrame schema 
+        * 
+        *  @param      schema
+        *              Frame schema in the form of List<ValueType>
+        *  @return
+        *              Returns the DataFrame schema (StructType)
+        */
+       public static StructType convertFrameSchemaToDFSchema(List<ValueType> 
lschema)
+       {
+               // Generate the schema based on the string of schema
+               List<StructField> fields = new ArrayList<StructField>();
+               
+               int i = 1;
+               for (ValueType schema : lschema) {
+                       org.apache.spark.sql.types.DataType dataType = 
DataTypes.StringType;
+                       switch(schema) {
+                               case STRING:  dataType = DataTypes.StringType; 
break;
+                               case DOUBLE:  dataType = DataTypes.DoubleType; 
break;
+                               case INT:     dataType = DataTypes.LongType; 
break;
+                               case BOOLEAN: dataType = DataTypes.BooleanType; 
break;
+                               default:
+                                       LOG.warn("Using default type String for 
" + schema.toString());
+                       }
+                       fields.add(DataTypes.createStructField("C"+i++, 
dataType, true));
+               }
+               
+               return DataTypes.createStructType(fields);
+       }
+       
+       /* 
+        * It will return JavaRDD<Row> based on csv data input file.
+        */
+       public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String 
fnameIn, String delim, List<ValueType> schema)
+       {
+               // Load a text file and convert each line to a java rdd.
+               JavaRDD<String> dataRdd = sc.textFile(fnameIn);
+               return dataRdd.map(new RowGenerator(schema, delim));
+       }
+       
+       /* 
+        * Row Generator class based on individual line in CSV file.
+        */
+       private static class RowGenerator implements Function<String,Row> 
+       {
+               private static final long serialVersionUID = 
-6736256507697511070L;
+
+               private List<ValueType> _schema = null;
+               private String _delim = null; 
+               
+               public RowGenerator(List<ValueType> schema, String delim) {
+                       _schema = schema;
+                       _delim = delim;
+               }               
+                
+               @Override
+               public Row call(String record) throws Exception {
+                     String[] fields = IOUtilFunctions.splitCSV(record, 
_delim);
+                     Object[] objects = new Object[fields.length]; 
+                     for (int i=0; i<fields.length; i++) {
+                             objects[i] = 
UtilFunctions.stringToObject(_schema.get(i), fields[i]);
+                     }
+                     return RowFactory.create(objects);
+               }
+       }
+       
 
        /////////////////////////////////
        // CSV-SPECIFIC FUNCTIONS

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index 4b98f88..fa17fcd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -23,14 +23,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.instructions.InstructionUtils;
@@ -623,80 +615,4 @@ public class UtilFunctions
        
                return in1.getDataType();
        }
-       
-       /*
-        * This function will convert Frame schema into DataFrame schema 
-        * 
-        *  @param      schema
-        *              Frame schema in the form of List<ValueType>
-        *  @return
-        *              Returns the DataFrame schema (StructType)
-        */
-       public static StructType convertFrameSchemaToDFSchema(List<ValueType> 
lschema)
-       {
-               // Generate the schema based on the string of schema
-               List<StructField> fields = new ArrayList<StructField>();
-               
-               int i = 1;
-               for (ValueType schema : lschema)
-               {
-                       org.apache.spark.sql.types.DataType dataType = 
DataTypes.StringType;
-                       switch(schema)
-                       {
-                               case STRING:
-                                       dataType = DataTypes.StringType;
-                                       break;
-                               case DOUBLE:
-                                       dataType = DataTypes.DoubleType;
-                                       break;
-                               case INT:
-                                       dataType = DataTypes.LongType;
-                                       break;
-                               case BOOLEAN:
-                                       dataType = DataTypes.BooleanType;
-                                       break;
-                               default:
-                                       System.out.println("Default schema type 
is String.");
-                       }
-                       fields.add(DataTypes.createStructField("C"+i++, 
dataType, true));
-               }
-               
-               return DataTypes.createStructType(fields);
-       }
-       
-       /* 
-        * It will return JavaRDD<Row> based on csv data input file.
-        */
-       public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String 
fnameIn, String separator, List<ValueType> schema)
-       {
-               // Load a text file and convert each line to a java rdd.
-               JavaRDD<String> dataRdd = sc.textFile(fnameIn);
-               return dataRdd.map(new RowGenerator(schema));
-       }
-       
-       /* 
-        * Row Generator class based on individual line in CSV file.
-        */
-       private static class RowGenerator implements Function<String,Row> 
-       {
-               private static final long serialVersionUID = 
-6736256507697511070L;
-
-               List<ValueType> _schema = null;
-                
-               public RowGenerator(List<ValueType> schema)
-               {
-                       _schema = schema;
-               }               
-                
-               @Override
-               public Row call(String record) throws Exception {
-                     String[] fields = record.split(",");
-                     Object[] objects = new Object[fields.length]; 
-                     for (int i=0; i<fields.length; i++) {
-                             objects[i] = 
UtilFunctions.stringToObject(_schema.get(i), fields[i]);
-                     }
-                     return RowFactory.create(objects);
-               }
-       }
-       
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index fb076bd..107dee3 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase
 
                                //Create DataFrame 
                                SQLContext sqlContext = new SQLContext(sc);
-                               StructType dfSchema = 
UtilFunctions.convertFrameSchemaToDFSchema(schema);
-                               JavaRDD<Row> rowRDD = 
UtilFunctions.getRowRDD(sc, fnameIn, separator, schema);
+                               StructType dfSchema = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema);
+                               JavaRDD<Row> rowRDD = 
FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema);
                                DataFrame df = 
sqlContext.createDataFrame(rowRDD, dfSchema);
                                
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index b6184cf..b3324df 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -224,13 +224,13 @@ public class FrameTest extends AutomatedTestBase
                {
                        //Create DataFrame for input A 
                        SQLContext sqlContext = new SQLContext(sc);
-                       StructType dfSchemaA = 
UtilFunctions.convertFrameSchemaToDFSchema(lschema);
-                       JavaRDD<Row> rowRDDA = UtilFunctions.getRowRDD(jsc, 
input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
+                       StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema);
+                       JavaRDD<Row> rowRDDA = 
FrameRDDConverterUtils.getRowRDD(jsc, input("A"), 
DataExpression.DEFAULT_DELIM_DELIMITER, lschema);
                        dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
                        
                        //Create DataFrame for input B 
-                       StructType dfSchemaB = 
UtilFunctions.convertFrameSchemaToDFSchema(lschemaB);
-                       JavaRDD<Row> rowRDDB = UtilFunctions.getRowRDD(jsc, 
input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
+                       StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB);
+                       JavaRDD<Row> rowRDDB = 
FrameRDDConverterUtils.getRowRDD(jsc, input("B"), 
DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB);
                        dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
                }
 

Reply via email to