Repository: incubator-systemml Updated Branches: refs/heads/master bbc77e71e -> e7510a03b
[SYSTEMML-1280] Restore and deprecate SQLContext methods Restore Java SQLContext method signatures (migrated to SparkSession by SYSTEMML-1194) in case any users are using the old SystemML methods and are unable to use SparkSessions. Applies to old MLContext, MLMatrix, MLOutput, FrameRDDConverterUtils, RDDConverterUtils, and RDDConverterUtilsExt. Closes #396. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e7510a03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e7510a03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e7510a03 Branch: refs/heads/master Commit: e7510a03b95929b34e2fe6632c640851816716ea Parents: bbc77e7 Author: Deron Eriksson <[email protected]> Authored: Thu Feb 16 15:12:36 2017 -0800 Committer: Deron Eriksson <[email protected]> Committed: Thu Feb 16 15:12:36 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/MLContext.java | 18 +++++- .../java/org/apache/sysml/api/MLMatrix.java | 20 +++++- .../java/org/apache/sysml/api/MLOutput.java | 66 +++++++++++++++++++- .../spark/utils/FrameRDDConverterUtils.java | 9 +++ .../spark/utils/RDDConverterUtils.java | 9 +++ .../spark/utils/RDDConverterUtilsExt.java | 17 ++++- 6 files changed, 133 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index a128c37..a5dc859 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -37,6 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.api.jmlc.JMLCUtils; @@ -1593,5 +1594,20 @@ public class MLContext { JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = out.getBinaryBlockedRDD("output"); MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); return MLMatrix.createMLMatrix(this, sparkSession, blocks, mcOut); - } + } + + /** + * Experimental API: Might be discontinued in future release + * @param sqlContext the SQL Context + * @param filePath the file path + * @param format the format + * @return the MLMatrix + * @throws IOException if IOException occurs + * @throws DMLException if DMLException occurs + * @throws ParseException if ParseException occurs + */ + public MLMatrix read(SQLContext sqlContext, String filePath, String format) throws IOException, DMLException, ParseException { + SparkSession sparkSession = sqlContext.sparkSession(); + return read(sparkSession, filePath, format); + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLMatrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java index 873e831..45f631f 100644 --- a/src/main/java/org/apache/sysml/api/MLMatrix.java +++ b/src/main/java/org/apache/sysml/api/MLMatrix.java @@ -25,6 +25,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; @@ -76,11 +77,21 @@ public class MLMatrix extends Dataset<Row> { this.ml = ml; } + protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) { + super(sqlContext, logicalPlan, RowEncoder.apply(null)); + this.ml = ml; + } + protected MLMatrix(SparkSession sparkSession, QueryExecution queryExecution, MLContext ml) { super(sparkSession, queryExecution, RowEncoder.apply(null)); this.ml = ml; } - + + protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) { + super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null)); + this.ml = ml; + } + // Only used internally to set a new MLMatrix after one of matrix operations. // Not to be used externally. protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException { @@ -110,7 +121,12 @@ public class MLMatrix extends Dataset<Row> { StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); return new MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml); } - + + static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException { + SparkSession sparkSession = sqlContext.sparkSession(); + return createMLMatrix(ml, sparkSession, blocks, mc); + } + /** * Convenient method to write a MLMatrix. * http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/api/MLOutput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java index ca90fc9..a16eccd 100644 --- a/src/main/java/org/apache/sysml/api/MLOutput.java +++ b/src/main/java/org/apache/sysml/api/MLOutput.java @@ -26,6 +26,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; import org.apache.sysml.runtime.DMLRuntimeException; @@ -107,7 +108,24 @@ public class MLOutput { } throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); } - + + /** + * Note, the output DataFrame has an additional column ID. + * An easy way to get DataFrame without ID is by df.drop("__INDEX") + * + * @param sqlContext the SQL Context + * @param varName the variable name + * @return the DataFrame + * @throws DMLRuntimeException if DMLRuntimeException occurs + */ + public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException { + if (sqlContext == null) { + throw new DMLRuntimeException("SQLContext is not created"); + } + SparkSession sparkSession = sqlContext.sparkSession(); + return getDF(sparkSession, varName); + } + /** * Obtain the DataFrame * @@ -134,7 +152,24 @@ public class MLOutput { } } - + + /** + * Obtain the DataFrame + * + * @param sqlContext the SQL Context + * @param varName the variable name + * @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector + * @return the DataFrame + * @throws DMLRuntimeException if DMLRuntimeException occurs + */ + public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException { + if (sqlContext == null) { + throw new DMLRuntimeException("SQLContext is not created"); + } + SparkSession sparkSession = sqlContext.sparkSession(); + return getDF(sparkSession, varName, outputVector); + } + /** * This methods improves the performance of MLPipeline wrappers. * @@ -153,6 +188,25 @@ public class MLOutput { JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName); return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockRDD, mc, true); } + + /** + * This methods improves the performance of MLPipeline wrappers. + * + * @param sqlContext the SQL Context + * @param varName the variable name + * @param mc the matrix characteristics + * @return the DataFrame + * @throws DMLRuntimeException if DMLRuntimeException occurs + */ + public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) + throws DMLRuntimeException + { + if (sqlContext == null) { + throw new DMLRuntimeException("SQLContext is not created"); + } + SparkSession sparkSession = sqlContext.sparkSession(); + return getDF(sparkSession, varName, mc); + } public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException { if(format.equals("text")) { @@ -202,4 +256,12 @@ public class MLOutput { } throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); } + + public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException { + if (sqlContext == null) { + throw new DMLRuntimeException("SQLContext is not created"); + } + SparkSession sparkSession = sqlContext.sparkSession(); + return getMLMatrix(ml, sparkSession, varName); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 3d5df56..013a1a8 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; @@ -285,6 +286,14 @@ public class FrameRDDConverterUtils return sparkSession.createDataFrame(rowRDD, dfSchema); } + @Deprecated + public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext, JavaPairRDD<Long,FrameBlock> in, + MatrixCharacteristics mc, ValueType[] schema) + { + SparkSession sparkSession = sqlContext.sparkSession(); + return binaryBlockToDataFrame(sparkSession, in, mc, schema); + } + /** * This function will convert Frame schema into DataFrame schema http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index 6b6c61d..e847471 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -42,6 +42,7 @@ import org.apache.spark.ml.linalg.Vectors; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; @@ -286,6 +287,14 @@ public class RDDConverterUtils return sparkSession.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields)); } + @Deprecated + public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlContext, + JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector) + { + SparkSession sparkSession = sqlContext.sparkSession(); + return binaryBlockToDataFrame(sparkSession, in, mc, toVector); + } + public static JavaPairRDD<LongWritable, Text> stringToSerializableText(JavaPairRDD<Long,String> in) { return in.mapToPair(new TextToSerTextFunction()); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e7510a03/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index f4a02dd..973db64 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -236,7 +236,22 @@ public class RDDConverterUtilsExt JavaRDD<Row> newRows = df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID()); return sparkSession.createDataFrame(newRows, new StructType(newSchema)); } - + + /** + * Add element indices as new column to DataFrame + * + * @param df input data frame + * @param sqlContext the SQL Context + * @param nameOfCol name of index column + * @return new data frame + * + * @deprecated This will be removed in SystemML 1.0. + */ + @Deprecated + public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) { + SparkSession sparkSession = sqlContext.sparkSession(); + return addIDToDataFrame(df, sparkSession, nameOfCol); + } private static class MatrixEntryToBinaryBlockFunction implements PairFlatMapFunction<Iterator<MatrixEntry>,MatrixIndexes,MatrixBlock>
