[SYSTEMML-1194] Replace SQLContext with SparkSession SQLContext constructors have been deprecated in Spark 2. Replace SQLContext references with SparkSession.
Closes #360. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/80ab57bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/80ab57bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/80ab57bd Branch: refs/heads/master Commit: 80ab57bda803111e566284aed31ca241ad73edee Parents: 201238f Author: Deron Eriksson <[email protected]> Authored: Wed Feb 15 11:28:28 2017 -0800 Committer: Deron Eriksson <[email protected]> Committed: Wed Feb 15 11:28:28 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/sysml/api/MLContext.java | 14 +-- .../java/org/apache/sysml/api/MLMatrix.java | 28 +++--- .../java/org/apache/sysml/api/MLOutput.java | 50 ++++++----- .../api/mlcontext/MLContextConversionUtil.java | 9 +- .../org/apache/sysml/api/python/SystemML.py | 34 ++------ .../spark/utils/FrameRDDConverterUtils.java | 13 ++- .../spark/utils/RDDConverterUtils.java | 13 ++- .../spark/utils/RDDConverterUtilsExt.java | 44 ++++++++-- src/main/python/systemml/converters.py | 4 +- src/main/python/systemml/defmatrix.py | 8 +- src/main/python/systemml/mllearn/estimators.py | 76 ++++++++--------- src/main/python/tests/test_mllearn_df.py | 12 +-- src/main/python/tests/test_mllearn_numpy.py | 20 ++--- .../sysml/api/ml/BaseSystemMLClassifier.scala | 4 +- .../sysml/api/ml/BaseSystemMLRegressor.scala | 2 +- .../sysml/api/ml/LogisticRegression.scala | 10 +-- .../test/integration/AutomatedTestBase.java | 4 +- .../conversion/RDDConverterUtilsExtTest.java | 20 ++--- .../functions/frame/FrameConverterTest.java | 9 +- .../DataFrameMatrixConversionTest.java | 8 +- .../DataFrameRowFrameConversionTest.java | 13 +-- .../DataFrameVectorFrameConversionTest.java | 24 ++---- .../mlcontext/DataFrameVectorScriptTest.java | 24 ++---- .../functions/mlcontext/FrameTest.java | 10 +-- .../mlcontext/MLContextFrameTest.java | 27 +++--- .../integration/mlcontext/MLContextTest.java | 90 ++++++++++---------- .../sysml/api/ml/LogisticRegressionSuite.scala | 9 +- .../sysml/api/ml/WrapperSparkContext.scala | 16 ++-- 28 files changed, 294 insertions(+), 301 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLContext.java b/src/main/java/org/apache/sysml/api/MLContext.java index 520e51e..a128c37 100644 --- a/src/main/java/org/apache/sysml/api/MLContext.java +++ b/src/main/java/org/apache/sysml/api/MLContext.java @@ -37,7 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.api.jmlc.JMLCUtils; import org.apache.sysml.api.mlcontext.ScriptType; @@ -101,9 +101,9 @@ import org.apache.sysml.utils.Statistics; * <p> * Create input DataFrame from CSV file and potentially perform some feature transformation * <pre><code> - * scala> val W = sqlContext.load("com.databricks.spark.csv", Map("path" -> "W.csv", "header" -> "false")) - * scala> val H = sqlContext.load("com.databricks.spark.csv", Map("path" -> "H.csv", "header" -> "false")) - * scala> val V = sqlContext.load("com.databricks.spark.csv", Map("path" -> "V.csv", "header" -> "false")) + * scala> val W = sparkSession.load("com.databricks.spark.csv", Map("path" -> "W.csv", "header" -> "false")) + * scala> val H = sparkSession.load("com.databricks.spark.csv", Map("path" -> "H.csv", "header" -> "false")) + * scala> val V = sparkSession.load("com.databricks.spark.csv", Map("path" -> "V.csv", "header" -> "false")) * </code></pre> * <p> * Create MLContext @@ -1578,7 +1578,7 @@ public class MLContext { // TODO: Add additional create to provide sep, missing values, etc. for CSV /** * Experimental API: Might be discontinued in future release - * @param sqlContext the SQLContext + * @param sparkSession the Spark Session * @param filePath the file path * @param format the format * @return the MLMatrix @@ -1586,12 +1586,12 @@ public class MLContext { * @throws DMLException if DMLException occurs * @throws ParseException if ParseException occurs */ - public MLMatrix read(SQLContext sqlContext, String filePath, String format) throws IOException, DMLException, ParseException { + public MLMatrix read(SparkSession sparkSession, String filePath, String format) throws IOException, DMLException, ParseException { this.reset(); this.registerOutput("output"); MLOutput out = this.executeScript("output = read(\"" + filePath + "\", format=\"" + format + "\"); " + MLMatrix.writeStmt); JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = out.getBinaryBlockedRDD("output"); MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return MLMatrix.createMLMatrix(this, sqlContext, blocks, mcOut); + return MLMatrix.createMLMatrix(this, sparkSession, blocks, mcOut); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLMatrix.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java b/src/main/java/org/apache/sysml/api/MLMatrix.java index a8afa76..873e831 100644 --- a/src/main/java/org/apache/sysml/api/MLMatrix.java +++ b/src/main/java/org/apache/sysml/api/MLMatrix.java @@ -25,10 +25,10 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.execution.QueryExecution; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.QueryExecution; import org.apache.spark.sql.types.StructType; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; @@ -57,8 +57,8 @@ import scala.Tuple2; import org.apache.sysml.api.{MLContext, MLMatrix} val ml = new MLContext(sc) - val mat1 = ml.read(sqlContext, "V_small.csv", "csv") - val mat2 = ml.read(sqlContext, "W_small.mtx", "binary") + val mat1 = ml.read(sparkSession, "V_small.csv", "csv") + val mat2 = ml.read(sparkSession, "W_small.mtx", "binary") val result = mat1.transpose() %*% mat2 result.write("Result_small.mtx", "text") @@ -71,20 +71,20 @@ public class MLMatrix extends Dataset<Row> { protected MatrixCharacteristics mc = null; protected MLContext ml = null; - protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, MLContext ml) { - super(sqlContext, logicalPlan, RowEncoder.apply(null)); + protected MLMatrix(SparkSession sparkSession, LogicalPlan logicalPlan, MLContext ml) { + super(sparkSession, logicalPlan, RowEncoder.apply(null)); this.ml = ml; } - protected MLMatrix(SQLContext sqlContext, QueryExecution queryExecution, MLContext ml) { - super(sqlContext.sparkSession(), queryExecution, RowEncoder.apply(null)); + protected MLMatrix(SparkSession sparkSession, QueryExecution queryExecution, MLContext ml) { + super(sparkSession, queryExecution, RowEncoder.apply(null)); this.ml = ml; } // Only used internally to set a new MLMatrix after one of matrix operations. // Not to be used externally. protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext ml) throws DMLRuntimeException { - super(df.sqlContext(), df.logicalPlan(), RowEncoder.apply(null)); + super(df.sparkSession(), df.logicalPlan(), RowEncoder.apply(null)); this.mc = mc; this.ml = ml; } @@ -105,10 +105,10 @@ public class MLMatrix extends Dataset<Row> { // } // ------------------------------------------------------------------------------------------------ - static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException { + static MLMatrix createMLMatrix(MLContext ml, SparkSession sparkSession, JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) throws DMLRuntimeException { RDD<Row> rows = blocks.map(new GetMLBlock()).rdd(); StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - return new MLMatrix(sqlContext.createDataFrame(rows.toJavaRDD(), schema), mc, ml); + return new MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml); } /** @@ -233,7 +233,7 @@ public class MLMatrix extends Dataset<Row> { RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd(); StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return new MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); + return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); } private MLMatrix scalarBinaryOp(Double scalar, String op, boolean isScalarLeft) throws IOException, DMLException { @@ -244,7 +244,7 @@ public class MLMatrix extends Dataset<Row> { RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd(); StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return new MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); + return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); } // --------------------------------------------------- @@ -349,7 +349,7 @@ public class MLMatrix extends Dataset<Row> { RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new GetMLBlock()).rdd(); StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); MatrixCharacteristics mcOut = out.getMatrixCharacteristics("output"); - return new MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); + return new MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, ml); } // TODO: For 'scalar op matrix' operations: Do implicit conversions http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLOutput.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java b/src/main/java/org/apache/sysml/api/MLOutput.java index 0fcbfdf..ca90fc9 100644 --- a/src/main/java/org/apache/sysml/api/MLOutput.java +++ b/src/main/java/org/apache/sysml/api/MLOutput.java @@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; @@ -90,65 +90,68 @@ public class MLOutput { /** * Note, the output DataFrame has an additional column ID. * An easy way to get DataFrame without ID is by df.drop("__INDEX") - * @param sqlContext the SQLContext + * + * @param sparkSession the Spark Session * @param varName the variable name * @return the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws DMLRuntimeException { - if(sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created."); + public Dataset<Row> getDF(SparkSession sparkSession, String varName) throws DMLRuntimeException { + if(sparkSession == null) { + throw new DMLRuntimeException("SparkSession is not created."); } JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName); if(rdd != null) { MatrixCharacteristics mc = _outMetadata.get(varName); - return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, false); + return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, false); } throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); } /** * Obtain the DataFrame - * @param sqlContext the SQLContext + * + * @param sparkSession the Spark Session * @param varName the variable name * @param outputVector if true, returns DataFrame with two column: ID and org.apache.spark.ml.linalg.Vector * @return the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public Dataset<Row> getDF(SQLContext sqlContext, String varName, boolean outputVector) throws DMLRuntimeException { - if(sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created."); + public Dataset<Row> getDF(SparkSession sparkSession, String varName, boolean outputVector) throws DMLRuntimeException { + if(sparkSession == null) { + throw new DMLRuntimeException("SparkSession is not created."); } if(outputVector) { JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = getBinaryBlockedRDD(varName); if(rdd != null) { MatrixCharacteristics mc = _outMetadata.get(varName); - return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, true); + return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, true); } throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); } else { - return getDF(sqlContext, varName); + return getDF(sparkSession, varName); } } /** * This methods improves the performance of MLPipeline wrappers. - * @param sqlContext the SQLContext + * + * @param sparkSession the Spark Session * @param varName the variable name * @param mc the matrix characteristics * @return the DataFrame * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public Dataset<Row> getDF(SQLContext sqlContext, String varName, MatrixCharacteristics mc) + public Dataset<Row> getDF(SparkSession sparkSession, String varName, MatrixCharacteristics mc) throws DMLRuntimeException { - if(sqlContext == null) - throw new DMLRuntimeException("SQLContext is not created."); - + if(sparkSession == null) { + throw new DMLRuntimeException("SparkSession is not created."); + } JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = getBinaryBlockedRDD(varName); - return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, binaryBlockRDD, mc, true); + return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockRDD, mc, true); } public JavaRDD<String> getStringRDD(String varName, String format) throws DMLRuntimeException { @@ -180,12 +183,13 @@ public class MLOutput { public Dataset<Row> getDataFrameRDD(String varName, JavaSparkContext jsc) throws DMLRuntimeException { JavaPairRDD<Long, FrameBlock> binaryRDD = getFrameBinaryBlockedRDD(varName); MatrixCharacteristics mcIn = getMatrixCharacteristics(varName); - return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryRDD, mcIn, null); + SparkSession sparkSession = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate(); + return FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryRDD, mcIn, null); } - public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String varName) throws DMLRuntimeException { - if(sqlContext == null) { - throw new DMLRuntimeException("SQLContext is not created."); + public MLMatrix getMLMatrix(MLContext ml, SparkSession sparkSession, String varName) throws DMLRuntimeException { + if(sparkSession == null) { + throw new DMLRuntimeException("SparkSession is not created."); } else if(ml == null) { throw new DMLRuntimeException("MLContext is not created."); @@ -194,7 +198,7 @@ public class MLOutput { if(rdd != null) { MatrixCharacteristics mc = getMatrixCharacteristics(varName); StructType schema = MLBlock.getDefaultSchemaForBinaryBlock(); - return new MLMatrix(sqlContext.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), mc, ml); + return new MLMatrix(sparkSession.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), mc, ml); } throw new DMLRuntimeException("Variable " + varName + " not found in the output symbol table."); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java index 5414e4d..0225ea8 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java @@ -37,7 +37,7 @@ import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.sysml.api.MLContextProxy; @@ -1243,9 +1243,9 @@ public class MLContextConversionUtil { MatrixCharacteristics mc = matrixObject.getMatrixCharacteristics(); SparkContext sc = ((MLContext) MLContextProxy.getActiveMLContextForAPI()).getSparkContext(); - SQLContext sqlctx = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc).getOrCreate(); - return RDDConverterUtils.binaryBlockToDataFrame(sqlctx, binaryBlockMatrix, mc, isVectorDF); + return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockMatrix, mc, isVectorDF); } catch (DMLRuntimeException e) { throw new MLContextException("DMLRuntimeException while converting matrix object to DataFrame", e); @@ -1270,7 +1270,8 @@ public class MLContextConversionUtil { MatrixCharacteristics mc = frameObject.getMatrixCharacteristics(); JavaSparkContext jsc = MLContextUtil.getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI()); - return FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), binaryBlockFrame, mc, frameObject.getSchema()); + SparkSession sparkSession = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate(); + return FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockFrame, mc, frameObject.getSchema()); } catch (DMLRuntimeException e) { throw new MLContextException("DMLRuntimeException while converting frame object to DataFrame", e); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/python/SystemML.py ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/python/SystemML.py b/src/main/java/org/apache/sysml/api/python/SystemML.py index 3b8ae96..b22c570 100644 --- a/src/main/java/org/apache/sysml/api/python/SystemML.py +++ b/src/main/java/org/apache/sysml/api/python/SystemML.py @@ -25,7 +25,7 @@ from py4j.protocol import Py4JJavaError, Py4JError import traceback import os from pyspark.context import SparkContext -from pyspark.sql import DataFrame, SQLContext +from pyspark.sql import DataFrame, SparkSession from pyspark.rdd import RDD @@ -59,7 +59,7 @@ class MLContext(object): setForcedSparkExecType = (args[1] if len(args) > 1 else False) self.sc = sc self.ml = sc._jvm.org.apache.sysml.api.MLContext(sc._jsc, monitorPerformance, setForcedSparkExecType) - self.sqlCtx = SQLContext(sc) + self.sparkSession = SparkSession.builder.getOrCreate() except Py4JError: traceback.print_exc() @@ -212,41 +212,21 @@ class MLOutput(object): def getBinaryBlockedRDD(self, varName): raise Exception('Not supported in Python MLContext') - #try: - # rdd = RDD(self.jmlOut.getBinaryBlockedRDD(varName), self.sc) - # return rdd - #except Py4JJavaError: - # traceback.print_exc() def getMatrixCharacteristics(self, varName): raise Exception('Not supported in Python MLContext') - #try: - # chars = self.jmlOut.getMatrixCharacteristics(varName) - # return chars - #except Py4JJavaError: - # traceback.print_exc() - def getDF(self, sqlContext, varName): + def getDF(self, sparkSession, varName): try: - jdf = self.jmlOut.getDF(sqlContext._ssql_ctx, varName) - df = DataFrame(jdf, sqlContext) + jdf = self.jmlOut.getDF(sparkSession, varName) + df = DataFrame(jdf, sparkSession) return df except Py4JJavaError: traceback.print_exc() - - def getMLMatrix(self, sqlContext, varName): + + def getMLMatrix(self, sparkSession, varName): raise Exception('Not supported in Python MLContext') - #try: - # mlm = self.jmlOut.getMLMatrix(sqlContext._scala_SQLContext, varName) - # return mlm - #except Py4JJavaError: - # traceback.print_exc() def getStringRDD(self, varName, format): raise Exception('Not supported in Python MLContext') - #try: - # rdd = RDD(self.jmlOut.getStringRDD(varName, format), self.sc) - # return rdd - #except Py4JJavaError: - # traceback.print_exc() http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index ae3b686..3d5df56 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -22,9 +22,9 @@ package org.apache.sysml.runtime.instructions.spark.utils; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,14 +42,11 @@ import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; - -import scala.Tuple2; - import org.apache.sysml.parser.Expression.ValueType; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; @@ -70,6 +67,8 @@ import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.UtilFunctions; +import scala.Tuple2; + public class FrameRDDConverterUtils @@ -267,7 +266,7 @@ public class FrameRDDConverterUtils new DataFrameToBinaryBlockFunction(mc, colnames, fschema, containsID, colVect)); } - public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, JavaPairRDD<Long,FrameBlock> in, + public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession, JavaPairRDD<Long,FrameBlock> in, MatrixCharacteristics mc, ValueType[] schema) { if( !mc.colsKnown() ) @@ -283,7 +282,7 @@ public class FrameRDDConverterUtils StructType dfSchema = convertFrameSchemaToDFSchema(schema, true); //rdd to data frame conversion - return sqlctx.createDataFrame(rowRDD, dfSchema); + return sparkSession.createDataFrame(rowRDD, dfSchema); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index 1310b80..4d95b95 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -34,21 +34,18 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.ml.feature.LabeledPoint; import org.apache.spark.ml.linalg.DenseVector; import org.apache.spark.ml.linalg.Vector; import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.ml.linalg.Vectors; -import org.apache.spark.ml.feature.LabeledPoint; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.util.LongAccumulator; - -import scala.Tuple2; - import org.apache.sysml.conf.ConfigurationManager; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.runtime.DMLRuntimeException; @@ -68,6 +65,8 @@ import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.runtime.util.FastStringTokenizer; import org.apache.sysml.runtime.util.UtilFunctions; +import scala.Tuple2; + public class RDDConverterUtils { public static final String DF_ID_COLUMN = "__INDEX"; @@ -262,7 +261,7 @@ public class RDDConverterUtils return out; } - public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, + public static Dataset<Row> binaryBlockToDataFrame(SparkSession sparkSession, JavaPairRDD<MatrixIndexes, MatrixBlock> in, MatrixCharacteristics mc, boolean toVector) { if( !mc.colsKnown() ) @@ -284,7 +283,7 @@ public class RDDConverterUtils } //rdd to data frame conversion - return sqlctx.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields)); + return sparkSession.createDataFrame(rowsRDD.rdd(), DataTypes.createStructType(fields)); } public static JavaPairRDD<LongWritable, Text> stringToSerializableText(JavaPairRDD<Long,String> in) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java index e3b4d0c..f4a02dd 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java @@ -28,7 +28,6 @@ import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.spark.SparkContext; -import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -44,6 +43,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -221,11 +221,11 @@ public class RDDConverterUtilsExt * Add element indices as new column to DataFrame * * @param df input data frame - * @param sqlContext SQL context + * @param sparkSession the Spark Session * @param nameOfCol name of index column * @return new data frame */ - public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext sqlContext, String nameOfCol) { + public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SparkSession sparkSession, String nameOfCol) { StructField[] oldSchema = df.schema().fields(); StructField[] newSchema = new StructField[oldSchema.length + 1]; for(int i = 0; i < oldSchema.length; i++) { @@ -234,7 +234,7 @@ public class RDDConverterUtilsExt newSchema[oldSchema.length] = DataTypes.createStructField(nameOfCol, DataTypes.DoubleType, false); // JavaRDD<Row> newRows = df.rdd().toJavaRDD().map(new AddRowID()); JavaRDD<Row> newRows = df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID()); - return sqlContext.createDataFrame(newRows, new StructType(newSchema)); + return sparkSession.createDataFrame(newRows, new StructType(newSchema)); } @@ -378,9 +378,42 @@ public class RDDConverterUtilsExt * @return dataframe of ml.linalg.Vector rows * @throws DMLRuntimeException * if DMLRuntimeException occurs + * + * @deprecated This will be removed in SystemML 1.0. Please migrate to {@code + * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, Dataset<Row>) } */ + @Deprecated public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext sqlContext, Dataset<Row> inputDF) throws DMLRuntimeException { + SparkSession sparkSession = sqlContext.sparkSession(); + return stringDataFrameToVectorDataFrame(sparkSession, inputDF); + } + + /** + * Convert a dataframe of comma-separated string rows to a dataframe of + * ml.linalg.Vector rows. + * + * <p> + * Example input rows:<br> + * + * <code> + * ((1.2, 4.3, 3.4))<br> + * (1.2, 3.4, 2.2)<br> + * [[1.2, 34.3, 1.2, 1.25]]<br> + * [1.2, 3.4]<br> + * </code> + * + * @param sparkSession + * Spark Session + * @param inputDF + * dataframe of comma-separated row strings to convert to + * dataframe of ml.linalg.Vector rows + * @return dataframe of ml.linalg.Vector rows + * @throws DMLRuntimeException + * if DMLRuntimeException occurs + */ + public static Dataset<Row> stringDataFrameToVectorDataFrame(SparkSession sparkSession, Dataset<Row> inputDF) + throws DMLRuntimeException { StructField[] oldSchema = inputDF.schema().fields(); StructField[] newSchema = new StructField[oldSchema.length]; @@ -444,8 +477,7 @@ public class RDDConverterUtilsExt // output DF JavaRDD<Row> newRows = inputDF.rdd().toJavaRDD().zipWithIndex().map(new StringToVector()); - Dataset<Row> outDF = sqlContext.createDataFrame(newRows.rdd(), DataTypes.createStructType(newSchema)); - + Dataset<Row> outDF = sparkSession.createDataFrame(newRows.rdd(), DataTypes.createStructType(newSchema)); return outDF; } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/converters.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/converters.py b/src/main/python/systemml/converters.py index c7ca8ef..2d32508 100644 --- a/src/main/python/systemml/converters.py +++ b/src/main/python/systemml/converters.py @@ -36,7 +36,7 @@ def getNumCols(numPyArr): return numPyArr.shape[1] -def convertToLabeledDF(sqlCtx, X, y=None): +def convertToLabeledDF(sparkSession, X, y=None): from pyspark.ml.feature import VectorAssembler if y is not None: pd1 = pd.DataFrame(X) @@ -49,7 +49,7 @@ def convertToLabeledDF(sqlCtx, X, y=None): inputColumns = ['C' + str(i) for i in pdf.columns] outputColumns = inputColumns assembler = VectorAssembler(inputCols=inputColumns, outputCol='features') - out = assembler.transform(sqlCtx.createDataFrame(pdf, outputColumns)) + out = assembler.transform(sparkSession.createDataFrame(pdf, outputColumns)) if y is not None: return out.select('features', 'label') else: http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/defmatrix.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/defmatrix.py b/src/main/python/systemml/defmatrix.py index 5f973f6..3e13bf2 100644 --- a/src/main/python/systemml/defmatrix.py +++ b/src/main/python/systemml/defmatrix.py @@ -28,7 +28,7 @@ try: import py4j.java_gateway from py4j.java_gateway import JavaObject from pyspark import SparkContext - from pyspark.sql import DataFrame, SQLContext + from pyspark.sql import DataFrame, SparkSession import pyspark.mllib.common except ImportError: raise ImportError('Unable to import `pyspark`. Hint: Make sure you are running with PySpark.') @@ -46,7 +46,7 @@ def setSparkContext(sc): SparkContext """ matrix.sc = sc - matrix.sqlContext = SQLContext(sc) + matrix.sparkSession = SparkSession.builder.getOrCreate() matrix.ml = MLContext(matrix.sc) @@ -290,7 +290,7 @@ def solve(A, b): >>> import numpy as np >>> from sklearn import datasets >>> import SystemML as sml - >>> from pyspark.sql import SQLContext + >>> from pyspark.sql import SparkSession >>> diabetes = datasets.load_diabetes() >>> diabetes_X = diabetes.data[:, np.newaxis, 2] >>> X_train = diabetes_X[:-20] @@ -523,7 +523,7 @@ class matrix(object): if isinstance(self.eval_data, Matrix): self.eval_data = self.eval_data.toDF() return self.eval_data - self.eval_data = matrix.sqlContext.createDataFrame(self.toPandas()) + self.eval_data = matrix.sparkSession.createDataFrame(self.toPandas()) return self.eval_data def save(self, file, format='csv'): http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index d4ece89..c4eaf3d 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -37,8 +37,8 @@ import math from ..converters import * from ..classloader import * -def assemble(sqlCtx, pdf, inputCols, outputCol): - tmpDF = sqlCtx.createDataFrame(pdf, list(pdf.columns)) +def assemble(sparkSession, pdf, inputCols, outputCol): + tmpDF = sparkSession.createDataFrame(pdf, list(pdf.columns)) assembler = VectorAssembler(inputCols=list(inputCols), outputCol=outputCol) return assembler.transform(tmpDF) @@ -129,7 +129,7 @@ class BaseSystemMLEstimator(Estimator): raise Exception('Number of rows of X and y should match') colNames = pdfX.columns pdfX[self.label_col] = pdfY[pdfY.columns[0]] - df = assemble(self.sqlCtx, pdfX, colNames, self.features_col).select(self.features_col, self.label_col) + df = assemble(self.sparkSession, pdfX, colNames, self.features_col).select(self.features_col, self.label_col) self.fit_df(df) else: numColsy = getNumCols(y) @@ -157,9 +157,9 @@ class BaseSystemMLEstimator(Estimator): if isinstance(X, SUPPORTED_TYPES): if self.transferUsingDF: pdfX = convertToPandasDF(X) - df = assemble(self.sqlCtx, pdfX, pdfX.columns, self.features_col).select(self.features_col) + df = assemble(self.sparkSession, pdfX, pdfX.columns, self.features_col).select(self.features_col) retjDF = self.model.transform(df._jdf) - retDF = DataFrame(retjDF, self.sqlCtx) + retDF = DataFrame(retjDF, self.sparkSession) retPDF = retDF.sort('__INDEX').select('prediction').toPandas() if isinstance(X, np.ndarray): return self.decode(retPDF.as_matrix().flatten()) @@ -182,7 +182,7 @@ class BaseSystemMLEstimator(Estimator): assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col) df = assembler.transform(X) retjDF = self.model.transform(df._jdf) - retDF = DataFrame(retjDF, self.sqlCtx) + retDF = DataFrame(retjDF, self.sparkSession) # Return DF return retDF.sort('__INDEX') else: @@ -245,8 +245,8 @@ class LogisticRegression(BaseSystemMLClassifier): >>> from sklearn import datasets, neighbors >>> from systemml.mllearn import LogisticRegression - >>> from pyspark.sql import SQLContext - >>> sqlCtx = SQLContext(sc) + >>> from pyspark.sql import SparkSession + >>> sparkSession = SparkSession.builder.getOrCreate() >>> digits = datasets.load_digits() >>> X_digits = digits.data >>> y_digits = digits.target + 1 @@ -255,7 +255,7 @@ class LogisticRegression(BaseSystemMLClassifier): >>> y_train = y_digits[:.9 * n_samples] >>> X_test = X_digits[.9 * n_samples:] >>> y_test = y_digits[.9 * n_samples:] - >>> logistic = LogisticRegression(sqlCtx) + >>> logistic = LogisticRegression(sparkSession) >>> print('LogisticRegression score: %f' % logistic.fit(X_train, y_train).score(X_test, y_test)) MLPipeline way @@ -263,9 +263,9 @@ class LogisticRegression(BaseSystemMLClassifier): >>> from pyspark.ml import Pipeline >>> from systemml.mllearn import LogisticRegression >>> from pyspark.ml.feature import HashingTF, Tokenizer - >>> from pyspark.sql import SQLContext - >>> sqlCtx = SQLContext(sc) - >>> training = sqlCtx.createDataFrame([ + >>> from pyspark.sql import SparkSession + >>> sparkSession = SparkSession.builder.getOrCreate() + >>> training = sparkSession.createDataFrame([ >>> (0L, "a b c d e spark", 1.0), >>> (1L, "b d", 2.0), >>> (2L, "spark f g h", 1.0), @@ -281,10 +281,10 @@ class LogisticRegression(BaseSystemMLClassifier): >>> ], ["id", "text", "label"]) >>> tokenizer = Tokenizer(inputCol="text", outputCol="words") >>> hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) - >>> lr = LogisticRegression(sqlCtx) + >>> lr = LogisticRegression(sparkSession) >>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) >>> model = pipeline.fit(training) - >>> test = sqlCtx.createDataFrame([ + >>> test = sparkSession.createDataFrame([ >>> (12L, "spark i j k"), >>> (13L, "l m n"), >>> (14L, "mapreduce spark"), @@ -294,13 +294,13 @@ class LogisticRegression(BaseSystemMLClassifier): """ - def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): + def __init__(self, sparkSession, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): """ Performs both binomial and multinomial logistic regression. Parameters ---------- - sqlCtx: PySpark SQLContext + sparkSession: PySpark SparkSession penalty: Only 'l2' supported fit_intercept: Specifies whether to add intercept or not (default: True) max_iter: Maximum number of outer (Fisher scoring) iterations (default: 100) @@ -309,8 +309,8 @@ class LogisticRegression(BaseSystemMLClassifier): C: 1/regularization parameter (default: 1.0) solver: Only 'newton-cg' solver supported """ - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc + self.sparkSession = sparkSession + self.sc = sparkSession._sc createJavaObject(self.sc, 'dummy') self.uid = "logReg" self.estimator = self.sc._jvm.org.apache.sysml.api.ml.LogisticRegression(self.uid, self.sc._jsc.sc()) @@ -340,7 +340,7 @@ class LinearRegression(BaseSystemMLRegressor): >>> import numpy as np >>> from sklearn import datasets >>> from systemml.mllearn import LinearRegression - >>> from pyspark.sql import SQLContext + >>> from pyspark.sql import SparkSession >>> # Load the diabetes dataset >>> diabetes = datasets.load_diabetes() >>> # Use only one feature @@ -352,7 +352,7 @@ class LinearRegression(BaseSystemMLRegressor): >>> diabetes_y_train = diabetes.target[:-20] >>> diabetes_y_test = diabetes.target[-20:] >>> # Create linear regression object - >>> regr = LinearRegression(sqlCtx, solver='newton-cg') + >>> regr = LinearRegression(sparkSession, solver='newton-cg') >>> # Train the model using the training sets >>> regr.fit(diabetes_X_train, diabetes_y_train) >>> # The mean square error @@ -361,13 +361,13 @@ class LinearRegression(BaseSystemMLRegressor): """ - def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): + def __init__(self, sparkSession, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False): """ Performs linear regression to model the relationship between one numerical response variable and one or more explanatory (feature) variables. Parameters ---------- - sqlCtx: PySpark SQLContext + sparkSession: PySpark SparkSession fit_intercept: Specifies whether to add intercept or not (default: True) max_iter: Maximum number of conjugate gradient iterations, or 0 if no maximum limit provided (default: 100) tol: Tolerance used in the convergence criterion (default: 0.000001) @@ -377,8 +377,8 @@ class LinearRegression(BaseSystemMLRegressor): 'direct-solve' solver is more efficient when the number of features is relatively small (m < 1000) and input matrix X is either tall or fairly dense; otherwise 'newton-cg' solver is more efficient. """ - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc + self.sparkSession = sparkSession + self.sc = sparkSession._sc createJavaObject(self.sc, 'dummy') self.uid = "lr" if solver == 'newton-cg' or solver == 'direct-solve': @@ -405,8 +405,8 @@ class SVM(BaseSystemMLClassifier): >>> from sklearn import datasets, neighbors >>> from systemml.mllearn import SVM - >>> from pyspark.sql import SQLContext - >>> sqlCtx = SQLContext(sc) + >>> from pyspark.sql import SparkSession + >>> sparkSession = SparkSession.builder.getOrCreate() >>> digits = datasets.load_digits() >>> X_digits = digits.data >>> y_digits = digits.target @@ -415,27 +415,27 @@ class SVM(BaseSystemMLClassifier): >>> y_train = y_digits[:.9 * n_samples] >>> X_test = X_digits[.9 * n_samples:] >>> y_test = y_digits[.9 * n_samples:] - >>> svm = SVM(sqlCtx, is_multi_class=True) + >>> svm = SVM(sparkSession, is_multi_class=True) >>> print('LogisticRegression score: %f' % svm.fit(X_train, y_train).score(X_test, y_test)) """ - def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False): + def __init__(self, sparkSession, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False): """ Performs both binary-class and multiclass SVM (Support Vector Machines). Parameters ---------- - sqlCtx: PySpark SQLContext + sparkSession: PySpark SparkSession fit_intercept: Specifies whether to add intercept or not (default: True) max_iter: Maximum number iterations (default: 100) tol: Tolerance used in the convergence criterion (default: 0.000001) C: 1/regularization parameter (default: 1.0) is_multi_class: Specifies whether to use binary-class SVM or multi-class SVM algorithm (default: False) """ - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc + self.sparkSession = sparkSession + self.sc = sparkSession._sc self.uid = "svm" createJavaObject(self.sc, 'dummy') self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, self.sc._jsc.sc(), is_multi_class) @@ -461,8 +461,8 @@ class NaiveBayes(BaseSystemMLClassifier): >>> from sklearn.feature_extraction.text import TfidfVectorizer >>> from systemml.mllearn import NaiveBayes >>> from sklearn import metrics - >>> from pyspark.sql import SQLContext - >>> sqlCtx = SQLContext(sc) + >>> from pyspark.sql import SparkSession + >>> sparkSession = SparkSession.builder.getOrCreate(sc) >>> categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 'sci.space'] >>> newsgroups_train = fetch_20newsgroups(subset='train', categories=categories) >>> newsgroups_test = fetch_20newsgroups(subset='test', categories=categories) @@ -470,24 +470,24 @@ class NaiveBayes(BaseSystemMLClassifier): >>> # Both vectors and vectors_test are SciPy CSR matrix >>> vectors = vectorizer.fit_transform(newsgroups_train.data) >>> vectors_test = vectorizer.transform(newsgroups_test.data) - >>> nb = NaiveBayes(sqlCtx) + >>> nb = NaiveBayes(sparkSession) >>> nb.fit(vectors, newsgroups_train.target) >>> pred = nb.predict(vectors_test) >>> metrics.f1_score(newsgroups_test.target, pred, average='weighted') """ - def __init__(self, sqlCtx, laplace=1.0, transferUsingDF=False): + def __init__(self, sparkSession, laplace=1.0, transferUsingDF=False): """ Performs Naive Bayes. Parameters ---------- - sqlCtx: PySpark SQLContext + sparkSession: PySpark SparkSession laplace: Laplace smoothing specified by the user to avoid creation of 0 probabilities (default: 1.0) """ - self.sqlCtx = sqlCtx - self.sc = sqlCtx._sc + self.sparkSession = sparkSession + self.sc = sparkSession._sc self.uid = "nb" createJavaObject(self.sc, 'dummy') self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc()) http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/tests/test_mllearn_df.py ---------------------------------------------------------------------- diff --git a/src/main/python/tests/test_mllearn_df.py b/src/main/python/tests/test_mllearn_df.py index 0d6a4b4..da49953 100644 --- a/src/main/python/tests/test_mllearn_df.py +++ b/src/main/python/tests/test_mllearn_df.py @@ -36,7 +36,7 @@ import numpy as np from pyspark.context import SparkContext from pyspark.ml import Pipeline from pyspark.ml.feature import HashingTF, Tokenizer -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession from sklearn import datasets, metrics, neighbors from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import TfidfVectorizer @@ -44,7 +44,7 @@ from sklearn.feature_extraction.text import TfidfVectorizer from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, SVM sc = SparkContext() -sqlCtx = SQLContext(sc) +sparkSession = SparkSession.builder.getOrCreate() # Currently not integrated with JUnit test # ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] --driver-class-path SystemML.jar test.py @@ -60,7 +60,7 @@ class TestMLLearn(unittest.TestCase): X_test = X_digits[int(.9 * n_samples):] y_test = y_digits[int(.9 * n_samples):] # Convert to DataFrame for i/o: current way to transfer data - logistic = LogisticRegression(sqlCtx, transferUsingDF=True) + logistic = LogisticRegression(sparkSession, transferUsingDF=True) score = logistic.fit(X_train, y_train).score(X_test, y_test) self.failUnless(score > 0.9) @@ -71,7 +71,7 @@ class TestMLLearn(unittest.TestCase): diabetes_X_test = diabetes_X[-20:] diabetes_y_train = diabetes.target[:-20] diabetes_y_test = diabetes.target[-20:] - regr = LinearRegression(sqlCtx, transferUsingDF=True) + regr = LinearRegression(sparkSession, transferUsingDF=True) regr.fit(diabetes_X_train, diabetes_y_train) score = regr.score(diabetes_X_test, diabetes_y_test) self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly) @@ -85,7 +85,7 @@ class TestMLLearn(unittest.TestCase): y_train = y_digits[:int(.9 * n_samples)] X_test = X_digits[int(.9 * n_samples):] y_test = y_digits[int(.9 * n_samples):] - svm = SVM(sqlCtx, is_multi_class=True, transferUsingDF=True) + svm = SVM(sparkSession, is_multi_class=True, transferUsingDF=True) score = svm.fit(X_train, y_train).score(X_test, y_test) self.failUnless(score > 0.9) @@ -97,7 +97,7 @@ class TestMLLearn(unittest.TestCase): # # Both vectors and vectors_test are SciPy CSR matrix # vectors = vectorizer.fit_transform(newsgroups_train.data) # vectors_test = vectorizer.transform(newsgroups_test.data) - # nb = NaiveBayes(sqlCtx) + # nb = NaiveBayes(sparkSession) # nb.fit(vectors, newsgroups_train.target) # pred = nb.predict(vectors_test) # score = metrics.f1_score(newsgroups_test.target, pred, average='weighted') http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/tests/test_mllearn_numpy.py ---------------------------------------------------------------------- diff --git a/src/main/python/tests/test_mllearn_numpy.py b/src/main/python/tests/test_mllearn_numpy.py index d030837..925554f 100644 --- a/src/main/python/tests/test_mllearn_numpy.py +++ b/src/main/python/tests/test_mllearn_numpy.py @@ -36,7 +36,7 @@ import numpy as np from pyspark.context import SparkContext from pyspark.ml import Pipeline from pyspark.ml.feature import HashingTF, Tokenizer -from pyspark.sql import SQLContext +from pyspark.sql import SparkSession from sklearn import datasets, metrics, neighbors from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import TfidfVectorizer @@ -44,7 +44,7 @@ from sklearn.feature_extraction.text import TfidfVectorizer from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, SVM sc = SparkContext() -sqlCtx = SQLContext(sc) +sparkSession = SparkSession.builder.getOrCreate() # Currently not integrated with JUnit test # ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] --driver-class-path SystemML.jar test.py @@ -58,12 +58,12 @@ class TestMLLearn(unittest.TestCase): y_train = y_digits[:int(.9 * n_samples)] X_test = X_digits[int(.9 * n_samples):] y_test = y_digits[int(.9 * n_samples):] - logistic = LogisticRegression(sqlCtx) + logistic = LogisticRegression(sparkSession) score = logistic.fit(X_train, y_train).score(X_test, y_test) self.failUnless(score > 0.9) def test_logistic_mlpipeline(self): - training = sqlCtx.createDataFrame([ + training = sparkSession.createDataFrame([ ("a b c d e spark", 1.0), ("b d", 2.0), ("spark f g h", 1.0), @@ -79,10 +79,10 @@ class TestMLLearn(unittest.TestCase): ], ["text", "label"]) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20) - lr = LogisticRegression(sqlCtx) + lr = LogisticRegression(sparkSession) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) - test = sqlCtx.createDataFrame([ + test = sparkSession.createDataFrame([ ("spark i j k", 1.0), ("l m n", 2.0), ("mapreduce spark", 1.0), @@ -101,7 +101,7 @@ class TestMLLearn(unittest.TestCase): diabetes_X_test = diabetes_X[-20:] diabetes_y_train = diabetes.target[:-20] diabetes_y_test = diabetes.target[-20:] - regr = LinearRegression(sqlCtx) + regr = LinearRegression(sparkSession) regr.fit(diabetes_X_train, diabetes_y_train) score = regr.score(diabetes_X_test, diabetes_y_test) self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am using it incorrectly) @@ -115,7 +115,7 @@ class TestMLLearn(unittest.TestCase): y_train = y_digits[:int(.9 * n_samples)] X_test = X_digits[int(.9 * n_samples):] y_test = y_digits[int(.9 * n_samples):] - svm = SVM(sqlCtx, is_multi_class=True) + svm = SVM(sparkSession, is_multi_class=True) score = svm.fit(X_train, y_train).score(X_test, y_test) self.failUnless(score > 0.9) @@ -128,7 +128,7 @@ class TestMLLearn(unittest.TestCase): y_train = y_digits[:int(.9 * n_samples)] X_test = X_digits[int(.9 * n_samples):] y_test = y_digits[int(.9 * n_samples):] - nb = NaiveBayes(sqlCtx) + nb = NaiveBayes(sparkSession) score = nb.fit(X_train, y_train).score(X_test, y_test) self.failUnless(score > 0.8) @@ -140,7 +140,7 @@ class TestMLLearn(unittest.TestCase): # # Both vectors and vectors_test are SciPy CSR matrix # vectors = vectorizer.fit_transform(newsgroups_train.data) # vectors_test = vectorizer.transform(newsgroups_test.data) - # nb = NaiveBayes(sqlCtx) + # nb = NaiveBayes(sparkSession) # nb.fit(vectors, newsgroups_train.target) # pred = nb.predict(vectors_test) # score = metrics.f1_score(newsgroups_test.target, pred, average='weighted') http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index 1145463..fb9697d 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -152,11 +152,11 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { if(outputProb) { val prob = modelPredict.getDataFrame(probVar, true).withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability") - val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN) + val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, PredictionUtils.joinUsingID(prob, predictedDF)) } else { - val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN) + val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, predictedDF) } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala index c47fb3c..08154bb 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala @@ -80,7 +80,7 @@ trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { val Xin_bin = new BinaryBlockMatrix(Xin, mcXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) val predictedDF = modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") - val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN) + val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, predictedDF) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index c0e3f35..ce89502 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -111,16 +111,16 @@ class LogisticRegressionModel(override val uid: String)( */ object LogisticRegressionExample { import org.apache.spark.{ SparkConf, SparkContext } + import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.feature.LabeledPoint def main(args: Array[String]) = { - val sparkConf: SparkConf = new SparkConf(); - val sc: SparkContext = new SparkContext("local", "TestLocal", sparkConf); - val sqlContext = new org.apache.spark.sql.SQLContext(sc); + val sparkSession = SparkSession.builder().master("local").appName("TestLocal").getOrCreate(); + val sc: SparkContext = sparkSession.sparkContext; - import sqlContext.implicits._ + import sparkSession.implicits._ val training = sc.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), @@ -130,7 +130,7 @@ object LogisticRegressionExample { LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) val lr = new LogisticRegression("log", sc) val lrmodel = lr.fit(training.toDF) - // lrmodel.mloutput.getDF(sqlContext, "B_out").show() + // lrmodel.mloutput.getDF(sparkSession, "B_out").show() val testing = sc.parallelize(Seq( LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java index 5b25c71..e5ed921 100644 --- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java @@ -719,13 +719,11 @@ public abstract class AutomatedTestBase TestUtils.writeTestScalar(baseDirectory + EXPECTED_DIR + cacheDir + name, value); expectedFiles.add(baseDirectory + EXPECTED_DIR + cacheDir + name); } - - @SuppressWarnings("deprecation") + protected static HashMap<CellIndex, Double> readDMLMatrixFromHDFS(String fileName) { return TestUtils.readDMLMatrixFromHDFS(baseDirectory + OUTPUT_DIR + fileName); } - @SuppressWarnings("deprecation") public HashMap<CellIndex, Double> readRMatrixFromFS(String fileName) { System.out.println("R script out: " + baseDirectory + EXPECTED_DIR + cacheDir + fileName); return TestUtils.readRMatrixFromFS(baseDirectory + EXPECTED_DIR + cacheDir + fileName); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java b/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java index 7a69423..8ca1d8d 100644 --- a/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java +++ b/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java @@ -32,7 +32,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -85,12 +85,12 @@ public class RDDConverterUtilsExtTest extends AutomatedTestBase { list.add("[1.2, 3.4]"); JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow()); - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, schema); - Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF); + Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF); List<String> expectedResults = new ArrayList<String>(); expectedResults.add("[[1.2,4.3,3.4]]"); @@ -111,12 +111,12 @@ public class RDDConverterUtilsExtTest extends AutomatedTestBase { list.add(null); JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow()); - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, schema); - Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF); + Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF); List<String> expectedResults = new ArrayList<String>(); expectedResults.add("[[1.2,3.4]]"); @@ -134,12 +134,12 @@ public class RDDConverterUtilsExtTest extends AutomatedTestBase { list.add("[cheeseburger,fries]"); JavaRDD<String> javaRddString = sc.parallelize(list); JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow()); - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("C1", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); - Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, schema); - Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF); + Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, schema); + Dataset<Row> outDF = RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF); // trigger evaluation to throw exception outDF.collectAsList(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java index d5edf01..f626813 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java @@ -32,7 +32,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; @@ -532,10 +532,10 @@ public class FrameConverterTest extends AutomatedTestBase OutputInfo oinfo = OutputInfo.BinaryBlockOutputInfo; //Create DataFrame - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); StructType dfSchema = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false); JavaRDD<Row> rowRDD = FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, lschema); - Dataset<Row> df = sqlContext.createDataFrame(rowRDD, dfSchema); + Dataset<Row> df = sparkSession.createDataFrame(rowRDD, dfSchema); JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils .dataFrameToBinaryBlock(sc, df, mc, false/*, columns*/) @@ -549,7 +549,8 @@ public class FrameConverterTest extends AutomatedTestBase JavaPairRDD<Long, FrameBlock> rddIn = sc .hadoopFile(fnameIn, iinfo.inputFormatClass, LongWritable.class, FrameBlock.class) .mapToPair(new LongWritableFrameToLongFrameFunction()); - Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, lschema); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); + Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, rddIn, mc, lschema); //Convert back DataFrame to binary block for comparison using original binary to converted DF and back to binary JavaPairRDD<LongWritable, FrameBlock> rddOut = FrameRDDConverterUtils http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java index d3fdc2a..199100e 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java @@ -23,8 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.junit.Test; +import org.apache.spark.sql.SparkSession; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.ConfigurationManager; @@ -38,6 +37,7 @@ import org.apache.sysml.runtime.util.DataConverter; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; +import org.junit.Test; public class DataFrameMatrixConversionTest extends AutomatedTestBase @@ -191,13 +191,13 @@ public class DataFrameMatrixConversionTest extends AutomatedTestBase //setup spark context sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); JavaSparkContext sc = sec.getSparkContext(); - SQLContext sqlctx = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); //get binary block input rdd JavaPairRDD<MatrixIndexes,MatrixBlock> in = SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz); //matrix - dataframe - matrix conversion - Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector); + Dataset<Row> df = RDDConverterUtils.binaryBlockToDataFrame(sparkSession, in, mc1, vector); df = ( rows==rows3 ) ? df.repartition(rows) : df; JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java index 0e826a3..09628e5 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java @@ -23,8 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; -import org.junit.Test; +import org.apache.spark.sql.SparkSession; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.ConfigurationManager; @@ -40,6 +39,7 @@ import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; +import org.junit.Test; public class DataFrameRowFrameConversionTest extends AutomatedTestBase @@ -216,15 +216,16 @@ public class DataFrameRowFrameConversionTest extends AutomatedTestBase //setup spark context sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); JavaSparkContext sc = sec.getSparkContext(); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); + sc.getConf().set("spark.memory.offHeap.enabled", "false"); - SQLContext sqlctx = new SQLContext(sc); - sqlctx.setConf("spark.sql.codegen.wholeStage", "false"); - + sparkSession.conf().set("spark.sql.codegen.wholeStage", "false"); + //get binary block input rdd JavaPairRDD<Long,FrameBlock> in = SparkExecutionContext.toFrameJavaPairRDD(sc, fbA); //frame - dataframe - frame conversion - Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema); + Dataset<Row> df = FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, in, mc1, schema); JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true); //get output frame block http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java index c6d2251..4a73376 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java @@ -30,12 +30,11 @@ import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.junit.Test; import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; import org.apache.sysml.conf.ConfigurationManager; @@ -53,6 +52,7 @@ import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; +import org.junit.Test; public class DataFrameVectorFrameConversionTest extends AutomatedTestBase @@ -268,10 +268,10 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase //setup spark context sec = (SparkExecutionContext) ExecutionContextFactory.createContext(); JavaSparkContext sc = sec.getSparkContext(); - SQLContext sqlctx = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); //create input data frame - Dataset<Row> df = createDataFrame(sqlctx, mbA, containsID, schema); + Dataset<Row> df = createDataFrame(sparkSession, mbA, containsID, schema); //dataframe - frame conversion JavaPairRDD<Long,FrameBlock> out = FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, containsID); @@ -294,17 +294,9 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase DMLScript.rtplatform = oldPlatform; } } - - /** - * - * @param sqlctx - * @param mb - * @param schema - * @return - * @throws DMLRuntimeException - */ + @SuppressWarnings("resource") - private Dataset<Row> createDataFrame(SQLContext sqlctx, MatrixBlock mb, boolean containsID, ValueType[] schema) + private Dataset<Row> createDataFrame(SparkSession sparkSession, MatrixBlock mb, boolean containsID, ValueType[] schema) throws DMLRuntimeException { //create in-memory list of rows @@ -350,8 +342,8 @@ public class DataFrameVectorFrameConversionTest extends AutomatedTestBase StructType dfSchema = DataTypes.createStructType(fields); //create rdd and data frame - JavaSparkContext sc = new JavaSparkContext(sqlctx.sparkContext()); + JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD<Row> rowRDD = sc.parallelize(list); - return sqlctx.createDataFrame(rowRDD, dfSchema); + return sparkSession.createDataFrame(rowRDD, dfSchema); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java index 14ed4b7..92677b8 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java @@ -32,12 +32,11 @@ import org.apache.spark.ml.linalg.VectorUDT; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import org.junit.Test; import org.apache.sysml.api.mlcontext.FrameFormat; import org.apache.sysml.api.mlcontext.FrameMetadata; import org.apache.sysml.api.mlcontext.MLContext; @@ -55,6 +54,7 @@ import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.TestConfiguration; import org.apache.sysml.test.utils.TestUtils; +import org.junit.Test; public class DataFrameVectorScriptTest extends AutomatedTestBase @@ -269,10 +269,10 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase SparkConf conf = SparkExecutionContext.createSystemMLSparkConf() .setAppName("MLContextFrameTest").setMaster("local"); sc = new JavaSparkContext(conf); - SQLContext sqlctx = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); //create input data frame - Dataset<Row> df = createDataFrame(sqlctx, mbA, containsID, schema); + Dataset<Row> df = createDataFrame(sparkSession, mbA, containsID, schema); // Create full frame metadata, and empty frame metadata FrameMetadata meta = new FrameMetadata(containsID ? FrameFormat.DF_WITH_INDEX : @@ -315,17 +315,9 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase ml.close(); } } - - /** - * - * @param sqlctx - * @param mb - * @param schema - * @return - * @throws DMLRuntimeException - */ + @SuppressWarnings("resource") - private Dataset<Row> createDataFrame(SQLContext sqlctx, MatrixBlock mb, boolean containsID, ValueType[] schema) + private Dataset<Row> createDataFrame(SparkSession sparkSession, MatrixBlock mb, boolean containsID, ValueType[] schema) throws DMLRuntimeException { //create in-memory list of rows @@ -371,8 +363,8 @@ public class DataFrameVectorScriptTest extends AutomatedTestBase StructType dfSchema = DataTypes.createStructType(fields); //create rdd and data frame - JavaSparkContext sc = new JavaSparkContext(sqlctx.sparkContext()); + JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext()); JavaRDD<Row> rowRDD = sc.parallelize(list); - return sqlctx.createDataFrame(rowRDD, dfSchema); + return sparkSession.createDataFrame(rowRDD, dfSchema); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java index e6a947f..d485c48 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java @@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; import org.apache.sysml.api.DMLException; import org.apache.sysml.api.DMLScript; @@ -68,7 +68,6 @@ import org.junit.BeforeClass; import org.junit.Test; -@SuppressWarnings("deprecation") public class FrameTest extends AutomatedTestBase { private final static String TEST_DIR = "functions/frame/"; @@ -238,15 +237,16 @@ public class FrameTest extends AutomatedTestBase if(bFromDataFrame) { //Create DataFrame for input A - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false); + JavaRDD<Row> rowRDDA = FrameRDDConverterUtils.csvToRowRDD(sc, input("A"), DataExpression.DEFAULT_DELIM_DELIMITER, schema); - dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA); + dfA = sparkSession.createDataFrame(rowRDDA, dfSchemaA); //Create DataFrame for input B StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false); JavaRDD<Row> rowRDDB = FrameRDDConverterUtils.csvToRowRDD(sc, input("B"), DataExpression.DEFAULT_DELIM_DELIMITER, schemaB); - dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB); + dfB = sparkSession.createDataFrame(rowRDDB, dfSchemaB); } try http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java index 0b3fac4..6dd74d3 100644 --- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java +++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java @@ -34,7 +34,7 @@ import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -53,7 +53,6 @@ import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils; import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysml.test.integration.AutomatedTestBase; import org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToDoubleArrayRow; - import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -239,11 +238,11 @@ public class MLContextFrameTest extends AutomatedTestBase { JavaRDD<Row> javaRddRowB = FrameRDDConverterUtils.csvToRowRDD(sc, javaRDDB, CSV_DELIM, schemaB); // Create DataFrame - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); StructType dfSchemaA = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaA, false); - Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, dfSchemaA); + Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, dfSchemaA); StructType dfSchemaB = FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false); - Dataset<Row> dataFrameB = sqlContext.createDataFrame(javaRddRowB, dfSchemaB); + Dataset<Row> dataFrameB = sparkSession.createDataFrame(javaRddRowB, dfSchemaB); if (script_type == SCRIPT_TYPE.DML) script = dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, fmB).out("A") .out("C"); @@ -493,18 +492,18 @@ public class MLContextFrameTest extends AutomatedTestBase { JavaRDD<Row> javaRddRowA = FrameRDDConverterUtils.csvToRowRDD(sc, javaRddStringA, CSV_DELIM, schema); JavaRDD<Row> javaRddRowB = javaRddStringB.map(new CommaSeparatedValueStringToDoubleArrayRow()); - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fieldsA = new ArrayList<StructField>(); fieldsA.add(DataTypes.createStructField("1", DataTypes.StringType, true)); fieldsA.add(DataTypes.createStructField("2", DataTypes.DoubleType, true)); StructType schemaA = DataTypes.createStructType(fieldsA); - Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA); + Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA); List<StructField> fieldsB = new ArrayList<StructField>(); fieldsB.add(DataTypes.createStructField("1", DataTypes.DoubleType, true)); StructType schemaB = DataTypes.createStructType(fieldsB); - Dataset<Row> dataFrameB = sqlContext.createDataFrame(javaRddRowB, schemaB); + Dataset<Row> dataFrameB = sparkSession.createDataFrame(javaRddRowB, schemaB); String dmlString = "[tA, tAM] = transformencode (target = A, spec = \"{ids: true ,recode: [ 1, 2 ]}\");\n" + "C = tA %*% B;\n" + "M = s * C;"; @@ -530,14 +529,14 @@ public class MLContextFrameTest extends AutomatedTestBase { JavaRDD<Row> javaRddRowA = sc. parallelize( Arrays.asList(rowsA)); - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fieldsA = new ArrayList<StructField>(); fieldsA.add(DataTypes.createStructField("myID", DataTypes.StringType, true)); fieldsA.add(DataTypes.createStructField("FeatureName", DataTypes.StringType, true)); fieldsA.add(DataTypes.createStructField("FeatureValue", DataTypes.IntegerType, true)); StructType schemaA = DataTypes.createStructType(fieldsA); - Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA); + Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA); String dmlString = "[tA, tAM] = transformencode (target = A, spec = \"{ids: false ,recode: [ myID, FeatureName ]}\");"; @@ -572,14 +571,14 @@ public class MLContextFrameTest extends AutomatedTestBase { JavaRDD<Row> javaRddRowA = sc. parallelize( Arrays.asList(rowsA)); - SQLContext sqlContext = new SQLContext(sc); + SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); List<StructField> fieldsA = new ArrayList<StructField>(); fieldsA.add(DataTypes.createStructField("featureName", DataTypes.StringType, true)); fieldsA.add(DataTypes.createStructField("featureValue", DataTypes.IntegerType, true)); fieldsA.add(DataTypes.createStructField("id", DataTypes.StringType, true)); StructType schemaA = DataTypes.createStructType(fieldsA); - Dataset<Row> dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA); + Dataset<Row> dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA); String dmlString = "[tA, tAM] = transformencode (target = A, spec = \"{ids: false ,recode: [ featureName, id ]}\");"; @@ -622,7 +621,7 @@ public class MLContextFrameTest extends AutomatedTestBase { // JavaRDD<Row> javaRddRowA = javaRddStringA.map(new // CommaSeparatedValueStringToRow()); // - // SQLContext sqlContext = new SQLContext(sc); + // SparkSession sparkSession = SparkSession.builder().sparkContext(sc.sc()).getOrCreate(); // // List<StructField> fieldsA = new ArrayList<StructField>(); // fieldsA.add(DataTypes.createStructField("1", DataTypes.StringType, @@ -630,7 +629,7 @@ public class MLContextFrameTest extends AutomatedTestBase { // fieldsA.add(DataTypes.createStructField("2", DataTypes.StringType, // true)); // StructType schemaA = DataTypes.createStructType(fieldsA); - // DataFrame dataFrameA = sqlContext.createDataFrame(javaRddRowA, schemaA); + // DataFrame dataFrameA = sparkSession.createDataFrame(javaRddRowA, schemaA); // // String dmlString = "[tA, tAM] = transformencode (target = A, spec = // \"{ids: true ,recode: [ 1, 2 ]}\");\n";
