Repository: incubator-systemml Updated Branches: refs/heads/master 02a9f2770 -> 1be911cc5
[SYSTEMML-568] Fix various issues w/ mlcontext frame support The commit 02a9f277000bd144c729311dac6c04bcb520180f introduces a couple of major and minor issues. This patch resolves resolves the following: (1) Removed spark references from general-purpose util functions. These references would create classnotfound exceptions in non-spark environments. (2) Missing delimiter handling (other than comma) in custom csv frame conversions. (3) Logging instead of prints to stdout which are inconsistent with the rest of the system and would make debugging much harder. (4) Removed redundant code for handling of scratch space and blocksize configuration from mlcontext utils. (5) Removed unnecessary rdd operations from mlcontext result handling. (6) Fix handling of csv properties in mlcontext csv frame conversion (which only used hard-coded defaults so far) as well as handling of quoted strings. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1be911cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1be911cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1be911cc Branch: refs/heads/master Commit: 1be911cc52b4217308faa074d2b0ac4d9b86c540 Parents: 02a9f27 Author: Matthias Boehm <[email protected]> Authored: Mon Aug 29 17:41:25 2016 +0200 Committer: Matthias Boehm <[email protected]> Committed: Mon Aug 29 17:41:25 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/MLContext.java | 21 ++--- .../java/org/apache/sysml/api/MLOutput.java | 16 +--- .../sysml/api/mlcontext/MLContextUtil.java | 8 +- .../spark/utils/FrameRDDConverterUtils.java | 73 ++++++++++++++++- .../sysml/runtime/util/UtilFunctions.java | 84 -------------------- .../functions/frame/FrameConverterTest.java | 4 +- .../functions/mlcontext/FrameTest.java | 8 +- 7 files changed, 96 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index 8f6e95f..297d3a5 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -524,16 +524,16 @@ public class MLContext { MatrixObject mo; if( format.equals("csv") ) { - //TODO replace default block size - MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz); + int blksz = ConfigurationManager.getBlocksize(); + MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz); mo = new MatrixObject(ValueType.DOUBLE, null, new MatrixFormatMetaData(mc, OutputInfo.CSVOutputInfo, InputInfo.CSVInputInfo)); } else if( format.equals("text") ) { if(rlen == -1 || clen == -1) { throw new DMLRuntimeException("The metadata is required in registerInput for format:" + format); } - //TODO replace default block size - MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz); + int blksz = ConfigurationManager.getBlocksize(); + MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz); mo = new MatrixObject(ValueType.DOUBLE, null, new MatrixFormatMetaData(mc, OutputInfo.TextCellOutputInfo, InputInfo.TextCellInputInfo)); } else if( format.equals("mm") ) { @@ -583,21 +583,24 @@ public class MLContext { if(_inVarnames == null) _inVarnames = new ArrayList<String>(); + //FIXME: MB why does the register input for frames implicitly convert the data to binary block, + //while the register input for matrices does not? FIXME + JavaPairRDD<LongWritable, Text> rddText = rddIn.mapToPair(new ConvertStringToLongTextPair()); - MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, OptimizerUtils.DEFAULT_BLOCKSIZE, OptimizerUtils.DEFAULT_BLOCKSIZE, nnz); + int blksz = ConfigurationManager.getBlocksize(); + MatrixCharacteristics mc = new MatrixCharacteristics(rlen, clen, blksz, blksz, nnz); FrameObject fo = new FrameObject(null, new MatrixFormatMetaData(mc, OutputInfo.BinaryBlockOutputInfo, InputInfo.BinaryBlockInputInfo)); JavaPairRDD<Long, FrameBlock> rdd = null; if( format.equals("csv") ) { - //TODO replace default block size - - rdd = FrameRDDConverterUtils.csvToBinaryBlock(new JavaSparkContext(getSparkContext()), rddText, mc, false, ",", false, -1); + CSVFileFormatProperties csvprops = (props!=null) ? (CSVFileFormatProperties)props: new CSVFileFormatProperties(); + rdd = FrameRDDConverterUtils.csvToBinaryBlock(new JavaSparkContext(getSparkContext()), + rddText, mc, csvprops.hasHeader(), csvprops.getDelim(), csvprops.isFill(), csvprops.getFillValue()); } else if( format.equals("text") ) { if(rlen == -1 || clen == -1) { throw new DMLRuntimeException("The metadata is required in registerInput for format:" + format); } - //TODO replace default block size rdd = FrameRDDConverterUtils.textCellToBinaryBlock(new JavaSparkContext(getSparkContext()), rddText, mc, schema); } else { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/MLOutput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java index 916a652..a18ab60 100644 --- a/src/main/java/org/apache/sysml/api/MLOutput.java +++ b/src/main/java/org/apache/sysml/api/MLOutput.java @@ -79,25 +79,17 @@ public class MLOutput { @SuppressWarnings("unchecked") public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockedRDD(String varName) throws DMLRuntimeException { if(_outputs.containsKey(varName)) { - JavaPairRDD<?,?> tmp = _outputs.get(varName); - if (tmp.first()._2() instanceof MatrixBlock) - return (JavaPairRDD<MatrixIndexes,MatrixBlock>)tmp; - else - return null; + return (JavaPairRDD<MatrixIndexes,MatrixBlock>) _outputs.get(varName); } - throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); + throw new DMLRuntimeException("Variable " + varName + " not found in the outputs."); } @SuppressWarnings("unchecked") public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockedRDD(String varName) throws DMLRuntimeException { if(_outputs.containsKey(varName)) { - JavaPairRDD<?,?> tmp = _outputs.get(varName); - if (tmp.first()._2() instanceof FrameBlock) - return (JavaPairRDD<Long,FrameBlock>)tmp; - else - return null; + return (JavaPairRDD<Long,FrameBlock>)_outputs.get(varName); } - throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); + throw new DMLRuntimeException("Variable " + varName + " not found in the outputs."); } public MatrixCharacteristics getMatrixCharacteristics(String varName) throws DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java index 120df32..6da6468 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java @@ -515,9 +515,7 @@ public final class MLContextUtil { * @return the default matrix block size */ public static int defaultBlockSize() { - DMLConfig conf = ConfigurationManager.getDMLConfig(); - int blockSize = conf.getIntValue(DMLConfig.DEFAULT_BLOCK_SIZE); - return blockSize; + return ConfigurationManager.getBlocksize(); } /** @@ -526,9 +524,7 @@ public final class MLContextUtil { * @return the lcoation of the scratch space directory */ public static String scratchSpace() { - DMLConfig conf = ConfigurationManager.getDMLConfig(); - String scratchSpace = conf.getTextValue(DMLConfig.SCRATCH_SPACE); - return scratchSpace; + return ConfigurationManager.getScratchSpace(); } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 3bd163c..fa8c48f 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -27,6 +27,8 @@ import java.util.Iterator; import java.util.List; import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.spark.api.java.JavaPairRDD; @@ -69,6 +71,8 @@ import org.apache.sysml.runtime.util.UtilFunctions; public class FrameRDDConverterUtils { + private static final Log LOG = LogFactory.getLog(FrameRDDConverterUtils.class.getName()); + //===================================== // CSV <--> Binary block @@ -363,12 +367,79 @@ public class FrameRDDConverterUtils JavaRDD<Row> rowRDD = in.flatMap(new BinaryBlockToDataFrameFunction()); SQLContext sqlContext = new SQLContext(sc); - StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema); + StructType dfSchema = convertFrameSchemaToDFSchema(schema); DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema); return df; } + + /* + * This function will convert Frame schema into DataFrame schema + * + * @param schema + * Frame schema in the form of List<ValueType> + * @return + * Returns the DataFrame schema (StructType) + */ + public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema) + { + // Generate the schema based on the string of schema + List<StructField> fields = new ArrayList<StructField>(); + + int i = 1; + for (ValueType schema : lschema) { + org.apache.spark.sql.types.DataType dataType = DataTypes.StringType; + switch(schema) { + case STRING: dataType = DataTypes.StringType; break; + case DOUBLE: dataType = DataTypes.DoubleType; break; + case INT: dataType = DataTypes.LongType; break; + case BOOLEAN: dataType = DataTypes.BooleanType; break; + default: + LOG.warn("Using default type String for " + schema.toString()); + } + fields.add(DataTypes.createStructField("C"+i++, dataType, true)); + } + + return DataTypes.createStructType(fields); + } + + /* + * It will return JavaRDD<Row> based on csv data input file. + */ + public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String delim, List<ValueType> schema) + { + // Load a text file and convert each line to a java rdd. + JavaRDD<String> dataRdd = sc.textFile(fnameIn); + return dataRdd.map(new RowGenerator(schema, delim)); + } + + /* + * Row Generator class based on individual line in CSV file. + */ + private static class RowGenerator implements Function<String,Row> + { + private static final long serialVersionUID = -6736256507697511070L; + + private List<ValueType> _schema = null; + private String _delim = null; + + public RowGenerator(List<ValueType> schema, String delim) { + _schema = schema; + _delim = delim; + } + + @Override + public Row call(String record) throws Exception { + String[] fields = IOUtilFunctions.splitCSV(record, _delim); + Object[] objects = new Object[fields.length]; + for (int i=0; i<fields.length; i++) { + objects[i] = UtilFunctions.stringToObject(_schema.get(i), fields[i]); + } + return RowFactory.create(objects); + } + } + ///////////////////////////////// // CSV-SPECIFIC FUNCTIONS http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java index 4b98f88..fa17fcd 100644 --- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java @@ -23,14 +23,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; import org.apache.sysml.parser.Expression.DataType; import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.instructions.InstructionUtils; @@ -623,80 +615,4 @@ public class UtilFunctions return in1.getDataType(); } - - /* - * This function will convert Frame schema into DataFrame schema - * - * @param schema - * Frame schema in the form of List<ValueType> - * @return - * Returns the DataFrame schema (StructType) - */ - public static StructType convertFrameSchemaToDFSchema(List<ValueType> lschema) - { - // Generate the schema based on the string of schema - List<StructField> fields = new ArrayList<StructField>(); - - int i = 1; - for (ValueType schema : lschema) - { - org.apache.spark.sql.types.DataType dataType = DataTypes.StringType; - switch(schema) - { - case STRING: - dataType = DataTypes.StringType; - break; - case DOUBLE: - dataType = DataTypes.DoubleType; - break; - case INT: - dataType = DataTypes.LongType; - break; - case BOOLEAN: - dataType = DataTypes.BooleanType; - break; - default: - System.out.println("Default schema type is String."); - } - fields.add(DataTypes.createStructField("C"+i++, dataType, true)); - } - - return DataTypes.createStructType(fields); - } - - /* - * It will return JavaRDD<Row> based on csv data input file. - */ - public static JavaRDD<Row> getRowRDD(JavaSparkContext sc, String fnameIn, String separator, List<ValueType> schema) - { - // Load a text file and convert each line to a java rdd. - JavaRDD<String> dataRdd = sc.textFile(fnameIn); - return dataRdd.map(new RowGenerator(schema)); - } - - /* - * Row Generator class based on individual line in CSV file. - */ - private static class RowGenerator implements Function<String,Row> - { - private static final long serialVersionUID = -6736256507697511070L; - - List<ValueType> _schema = null; - - public RowGenerator(List<ValueType> schema) - { - _schema = schema; - } - - @Override - public Row call(String record) throws Exception { - String[] fields = record.split(","); - Object[] objects = new Object[fields.length]; - for (int i=0; i<fields.length; i++) { - objects[i] = UtilFunctions.stringToObject(_schema.get(i), fields[i]); - } - return RowFactory.create(objects); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index fb076bd..107dee3 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -519,8 +519,8 @@ public class FrameConverterTest extends AutomatedTestBase //Create DataFrame SQLContext sqlContext = new SQLContext(sc); - StructType dfSchema = UtilFunctions.convertFrameSchemaToDFSchema(schema); - JavaRDD<Row> rowRDD = UtilFunctions.getRowRDD(sc, fnameIn, separator, schema); + StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema); + JavaRDD<Row> rowRDD = FrameRDDConverterUtils.getRowRDD(sc, fnameIn, separator, schema); DataFrame df = sqlContext.createDataFrame(rowRDD, dfSchema); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1be911cc/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java index b6184cf..b3324df 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java @@ -224,13 +224,13 @@ public class FrameTest extends AutomatedTestBase { //Create DataFrame for input A SQLContext sqlContext = new SQLContext(sc); - StructType dfSchemaA = UtilFunctions.convertFrameSchemaToDFSchema(lschema); - JavaRDD<Row> rowRDDA = UtilFunctions.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema); + StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema); + JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.getRowRDD(jsc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, lschema); dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA); //Create DataFrame for input B - StructType dfSchemaB = UtilFunctions.convertFrameSchemaToDFSchema(lschemaB); - JavaRDD<Row> rowRDDB = UtilFunctions.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB); + StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschemaB); + JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.getRowRDD(jsc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, lschemaB); dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB); }
