[SYSTEMML-1194] Replace SQLContext with SparkSession

SQLContext constructors have been deprecated in Spark 2.
Replace SQLContext references with SparkSession.

Closes #360.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/80ab57bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/80ab57bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/80ab57bd

Branch: refs/heads/master
Commit: 80ab57bda803111e566284aed31ca241ad73edee
Parents: 201238f
Author: Deron Eriksson <[email protected]>
Authored: Wed Feb 15 11:28:28 2017 -0800
Committer: Deron Eriksson <[email protected]>
Committed: Wed Feb 15 11:28:28 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/api/MLContext.java    | 14 +--
 .../java/org/apache/sysml/api/MLMatrix.java     | 28 +++---
 .../java/org/apache/sysml/api/MLOutput.java     | 50 ++++++-----
 .../api/mlcontext/MLContextConversionUtil.java  |  9 +-
 .../org/apache/sysml/api/python/SystemML.py     | 34 ++------
 .../spark/utils/FrameRDDConverterUtils.java     | 13 ++-
 .../spark/utils/RDDConverterUtils.java          | 13 ++-
 .../spark/utils/RDDConverterUtilsExt.java       | 44 ++++++++--
 src/main/python/systemml/converters.py          |  4 +-
 src/main/python/systemml/defmatrix.py           |  8 +-
 src/main/python/systemml/mllearn/estimators.py  | 76 ++++++++---------
 src/main/python/tests/test_mllearn_df.py        | 12 +--
 src/main/python/tests/test_mllearn_numpy.py     | 20 ++---
 .../sysml/api/ml/BaseSystemMLClassifier.scala   |  4 +-
 .../sysml/api/ml/BaseSystemMLRegressor.scala    |  2 +-
 .../sysml/api/ml/LogisticRegression.scala       | 10 +--
 .../test/integration/AutomatedTestBase.java     |  4 +-
 .../conversion/RDDConverterUtilsExtTest.java    | 20 ++---
 .../functions/frame/FrameConverterTest.java     |  9 +-
 .../DataFrameMatrixConversionTest.java          |  8 +-
 .../DataFrameRowFrameConversionTest.java        | 13 +--
 .../DataFrameVectorFrameConversionTest.java     | 24 ++----
 .../mlcontext/DataFrameVectorScriptTest.java    | 24 ++----
 .../functions/mlcontext/FrameTest.java          | 10 +--
 .../mlcontext/MLContextFrameTest.java           | 27 +++---
 .../integration/mlcontext/MLContextTest.java    | 90 ++++++++++----------
 .../sysml/api/ml/LogisticRegressionSuite.scala  |  9 +-
 .../sysml/api/ml/WrapperSparkContext.scala      | 16 ++--
 28 files changed, 294 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLContext.java 
b/src/main/java/org/apache/sysml/api/MLContext.java
index 520e51e..a128c37 100644
--- a/src/main/java/org/apache/sysml/api/MLContext.java
+++ b/src/main/java/org/apache/sysml/api/MLContext.java
@@ -37,7 +37,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.api.jmlc.JMLCUtils;
 import org.apache.sysml.api.mlcontext.ScriptType;
@@ -101,9 +101,9 @@ import org.apache.sysml.utils.Statistics;
  * <p>
  * Create input DataFrame from CSV file and potentially perform some feature 
transformation
  * <pre><code>
- * scala&gt; val W = sqlContext.load("com.databricks.spark.csv", Map("path" 
-&gt; "W.csv", "header" -&gt; "false"))
- * scala&gt; val H = sqlContext.load("com.databricks.spark.csv", Map("path" 
-&gt; "H.csv", "header" -&gt; "false"))
- * scala&gt; val V = sqlContext.load("com.databricks.spark.csv", Map("path" 
-&gt; "V.csv", "header" -&gt; "false"))
+ * scala&gt; val W = sparkSession.load("com.databricks.spark.csv", Map("path" 
-&gt; "W.csv", "header" -&gt; "false"))
+ * scala&gt; val H = sparkSession.load("com.databricks.spark.csv", Map("path" 
-&gt; "H.csv", "header" -&gt; "false"))
+ * scala&gt; val V = sparkSession.load("com.databricks.spark.csv", Map("path" 
-&gt; "V.csv", "header" -&gt; "false"))
  * </code></pre>
  * <p>
  * Create MLContext
@@ -1578,7 +1578,7 @@ public class MLContext {
        // TODO: Add additional create to provide sep, missing values, etc. for 
CSV
        /**
         * Experimental API: Might be discontinued in future release
-        * @param sqlContext the SQLContext
+        * @param sparkSession the Spark Session
         * @param filePath the file path
         * @param format the format
         * @return the MLMatrix
@@ -1586,12 +1586,12 @@ public class MLContext {
         * @throws DMLException if DMLException occurs
         * @throws ParseException if ParseException occurs
         */
-       public MLMatrix read(SQLContext sqlContext, String filePath, String 
format) throws IOException, DMLException, ParseException {
+       public MLMatrix read(SparkSession sparkSession, String filePath, String 
format) throws IOException, DMLException, ParseException {
                this.reset();
                this.registerOutput("output");
                MLOutput out = this.executeScript("output = read(\"" + filePath 
+ "\", format=\"" + format + "\"); " + MLMatrix.writeStmt);
                JavaPairRDD<MatrixIndexes, MatrixBlock> blocks = 
out.getBinaryBlockedRDD("output");
                MatrixCharacteristics mcOut = 
out.getMatrixCharacteristics("output");
-               return MLMatrix.createMLMatrix(this, sqlContext, blocks, mcOut);
+               return MLMatrix.createMLMatrix(this, sparkSession, blocks, 
mcOut);
        }       
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLMatrix.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLMatrix.java 
b/src/main/java/org/apache/sysml/api/MLMatrix.java
index a8afa76..873e831 100644
--- a/src/main/java/org/apache/sysml/api/MLMatrix.java
+++ b/src/main/java/org/apache/sysml/api/MLMatrix.java
@@ -25,10 +25,10 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.execution.QueryExecution;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.encoders.RowEncoder;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.QueryExecution;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -57,8 +57,8 @@ import scala.Tuple2;
 
  import org.apache.sysml.api.{MLContext, MLMatrix}
  val ml = new MLContext(sc)
- val mat1 = ml.read(sqlContext, "V_small.csv", "csv")
- val mat2 = ml.read(sqlContext, "W_small.mtx", "binary")
+ val mat1 = ml.read(sparkSession, "V_small.csv", "csv")
+ val mat2 = ml.read(sparkSession, "W_small.mtx", "binary")
  val result = mat1.transpose() %*% mat2
  result.write("Result_small.mtx", "text")
  
@@ -71,20 +71,20 @@ public class MLMatrix extends Dataset<Row> {
        protected MatrixCharacteristics mc = null;
        protected MLContext ml = null;
        
-       protected MLMatrix(SQLContext sqlContext, LogicalPlan logicalPlan, 
MLContext ml) {
-               super(sqlContext, logicalPlan, RowEncoder.apply(null));
+       protected MLMatrix(SparkSession sparkSession, LogicalPlan logicalPlan, 
MLContext ml) {
+               super(sparkSession, logicalPlan, RowEncoder.apply(null));
                this.ml = ml;
        }
 
-       protected MLMatrix(SQLContext sqlContext, QueryExecution 
queryExecution, MLContext ml) {
-               super(sqlContext.sparkSession(), queryExecution, 
RowEncoder.apply(null));
+       protected MLMatrix(SparkSession sparkSession, QueryExecution 
queryExecution, MLContext ml) {
+               super(sparkSession, queryExecution, RowEncoder.apply(null));
                this.ml = ml;
        }
        
        // Only used internally to set a new MLMatrix after one of matrix 
operations.
        // Not to be used externally.
        protected MLMatrix(Dataset<Row> df, MatrixCharacteristics mc, MLContext 
ml) throws DMLRuntimeException {
-               super(df.sqlContext(), df.logicalPlan(), 
RowEncoder.apply(null));
+               super(df.sparkSession(), df.logicalPlan(), 
RowEncoder.apply(null));
                this.mc = mc;
                this.ml = ml;
        }
@@ -105,10 +105,10 @@ public class MLMatrix extends Dataset<Row> {
 //     }
        
        // 
------------------------------------------------------------------------------------------------
-       static MLMatrix createMLMatrix(MLContext ml, SQLContext sqlContext, 
JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) 
throws DMLRuntimeException {
+       static MLMatrix createMLMatrix(MLContext ml, SparkSession sparkSession, 
JavaPairRDD<MatrixIndexes, MatrixBlock> blocks, MatrixCharacteristics mc) 
throws DMLRuntimeException {
                RDD<Row> rows = blocks.map(new GetMLBlock()).rdd();
                StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
-               return new 
MLMatrix(sqlContext.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
+               return new 
MLMatrix(sparkSession.createDataFrame(rows.toJavaRDD(), schema), mc, ml);
        }
        
        /**
@@ -233,7 +233,7 @@ public class MLMatrix extends Dataset<Row> {
                RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new 
GetMLBlock()).rdd();
                StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
                MatrixCharacteristics mcOut = 
out.getMatrixCharacteristics("output");
-               return new 
MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, 
ml);
+               return new 
MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, 
ml);
        }
        
        private MLMatrix scalarBinaryOp(Double scalar, String op, boolean 
isScalarLeft) throws IOException, DMLException {
@@ -244,7 +244,7 @@ public class MLMatrix extends Dataset<Row> {
                RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new 
GetMLBlock()).rdd();
                StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
                MatrixCharacteristics mcOut = 
out.getMatrixCharacteristics("output");
-               return new 
MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, 
ml);
+               return new 
MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, 
ml);
        }
        
        // ---------------------------------------------------
@@ -349,7 +349,7 @@ public class MLMatrix extends Dataset<Row> {
                RDD<Row> rows = out.getBinaryBlockedRDD("output").map(new 
GetMLBlock()).rdd();
                StructType schema = MLBlock.getDefaultSchemaForBinaryBlock();
                MatrixCharacteristics mcOut = 
out.getMatrixCharacteristics("output");
-               return new 
MLMatrix(this.sqlContext().createDataFrame(rows.toJavaRDD(), schema), mcOut, 
ml);
+               return new 
MLMatrix(this.sparkSession().createDataFrame(rows.toJavaRDD(), schema), mcOut, 
ml);
        }
        
        // TODO: For 'scalar op matrix' operations: Do implicit conversions 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/MLOutput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/MLOutput.java 
b/src/main/java/org/apache/sysml/api/MLOutput.java
index 0fcbfdf..ca90fc9 100644
--- a/src/main/java/org/apache/sysml/api/MLOutput.java
+++ b/src/main/java/org/apache/sysml/api/MLOutput.java
@@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
@@ -90,65 +90,68 @@ public class MLOutput {
        /**
         * Note, the output DataFrame has an additional column ID.
         * An easy way to get DataFrame without ID is by df.drop("__INDEX")
-        * @param sqlContext the SQLContext
+        * 
+        * @param sparkSession the Spark Session
         * @param varName the variable name
         * @return the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public Dataset<Row> getDF(SQLContext sqlContext, String varName) throws 
DMLRuntimeException {
-               if(sqlContext == null) {
-                       throw new DMLRuntimeException("SQLContext is not 
created.");
+       public Dataset<Row> getDF(SparkSession sparkSession, String varName) 
throws DMLRuntimeException {
+               if(sparkSession == null) {
+                       throw new DMLRuntimeException("SparkSession is not 
created.");
                }
                JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = 
getBinaryBlockedRDD(varName);
                if(rdd != null) {
                        MatrixCharacteristics mc = _outMetadata.get(varName);
-                       return 
RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, false);
+                       return 
RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, false);
                }
                throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
        }
        
        /**
         * Obtain the DataFrame
-        * @param sqlContext the SQLContext
+        * 
+        * @param sparkSession the Spark Session
         * @param varName the variable name
         * @param outputVector if true, returns DataFrame with two column: ID 
and org.apache.spark.ml.linalg.Vector
         * @return the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public Dataset<Row> getDF(SQLContext sqlContext, String varName, 
boolean outputVector) throws DMLRuntimeException {
-               if(sqlContext == null) {
-                       throw new DMLRuntimeException("SQLContext is not 
created.");
+       public Dataset<Row> getDF(SparkSession sparkSession, String varName, 
boolean outputVector) throws DMLRuntimeException {
+               if(sparkSession == null) {
+                       throw new DMLRuntimeException("SparkSession is not 
created.");
                }
                if(outputVector) {
                        JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = 
getBinaryBlockedRDD(varName);
                        if(rdd != null) {
                                MatrixCharacteristics mc = 
_outMetadata.get(varName);
-                               return 
RDDConverterUtils.binaryBlockToDataFrame(sqlContext, rdd, mc, true);
+                               return 
RDDConverterUtils.binaryBlockToDataFrame(sparkSession, rdd, mc, true);
                        }
                        throw new DMLRuntimeException("Variable " + varName + " 
not found in the output symbol table.");
                }
                else {
-                       return getDF(sqlContext, varName);
+                       return getDF(sparkSession, varName);
                }
                
        }
        
        /**
         * This methods improves the performance of MLPipeline wrappers.
-        * @param sqlContext the SQLContext
+        * 
+        * @param sparkSession the Spark Session
         * @param varName the variable name
         * @param mc the matrix characteristics
         * @return the DataFrame
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public Dataset<Row> getDF(SQLContext sqlContext, String varName, 
MatrixCharacteristics mc) 
+       public Dataset<Row> getDF(SparkSession sparkSession, String varName, 
MatrixCharacteristics mc) 
                throws DMLRuntimeException 
        {
-               if(sqlContext == null)
-                       throw new DMLRuntimeException("SQLContext is not 
created.");
-                       
+               if(sparkSession == null) {
+                       throw new DMLRuntimeException("SparkSession is not 
created.");
+               }
                JavaPairRDD<MatrixIndexes,MatrixBlock> binaryBlockRDD = 
getBinaryBlockedRDD(varName);
-               return RDDConverterUtils.binaryBlockToDataFrame(sqlContext, 
binaryBlockRDD, mc, true);
+               return RDDConverterUtils.binaryBlockToDataFrame(sparkSession, 
binaryBlockRDD, mc, true);
        }
        
        public JavaRDD<String> getStringRDD(String varName, String format) 
throws DMLRuntimeException {
@@ -180,12 +183,13 @@ public class MLOutput {
        public Dataset<Row> getDataFrameRDD(String varName, JavaSparkContext 
jsc) throws DMLRuntimeException {
                JavaPairRDD<Long, FrameBlock> binaryRDD = 
getFrameBinaryBlockedRDD(varName);
                MatrixCharacteristics mcIn = getMatrixCharacteristics(varName);
-               return FrameRDDConverterUtils.binaryBlockToDataFrame(new 
SQLContext(jsc), binaryRDD, mcIn, null);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(jsc.sc()).getOrCreate();
+               return 
FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryRDD, mcIn, 
null);
        }
        
-       public MLMatrix getMLMatrix(MLContext ml, SQLContext sqlContext, String 
varName) throws DMLRuntimeException {
-               if(sqlContext == null) {
-                       throw new DMLRuntimeException("SQLContext is not 
created.");
+       public MLMatrix getMLMatrix(MLContext ml, SparkSession sparkSession, 
String varName) throws DMLRuntimeException {
+               if(sparkSession == null) {
+                       throw new DMLRuntimeException("SparkSession is not 
created.");
                }
                else if(ml == null) {
                        throw new DMLRuntimeException("MLContext is not 
created.");
@@ -194,7 +198,7 @@ public class MLOutput {
                if(rdd != null) {
                        MatrixCharacteristics mc = 
getMatrixCharacteristics(varName);
                        StructType schema = 
MLBlock.getDefaultSchemaForBinaryBlock();
-                       return new 
MLMatrix(sqlContext.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), 
mc, ml);
+                       return new 
MLMatrix(sparkSession.createDataFrame(rdd.map(new GetMLBlock()).rdd(), schema), 
mc, ml);
                }
                throw new DMLRuntimeException("Variable " + varName + " not 
found in the output symbol table.");
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java 
b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
index 5414e4d..0225ea8 100644
--- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
+++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextConversionUtil.java
@@ -37,7 +37,7 @@ import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.api.MLContextProxy;
@@ -1243,9 +1243,9 @@ public class MLContextConversionUtil {
                        MatrixCharacteristics mc = 
matrixObject.getMatrixCharacteristics();
 
                        SparkContext sc = ((MLContext) 
MLContextProxy.getActiveMLContextForAPI()).getSparkContext();
-                       SQLContext sqlctx = new SQLContext(sc);
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc).getOrCreate();
                        
-                       return RDDConverterUtils.binaryBlockToDataFrame(sqlctx, 
binaryBlockMatrix, mc, isVectorDF);                     
+                       return 
RDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockMatrix, mc, 
isVectorDF);
                } 
                catch (DMLRuntimeException e) {
                        throw new MLContextException("DMLRuntimeException while 
converting matrix object to DataFrame", e);
@@ -1270,7 +1270,8 @@ public class MLContextConversionUtil {
                        MatrixCharacteristics mc = 
frameObject.getMatrixCharacteristics();
 
                        JavaSparkContext jsc = 
MLContextUtil.getJavaSparkContext((MLContext) 
MLContextProxy.getActiveMLContextForAPI());
-                       return 
FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(jsc), 
binaryBlockFrame, mc, frameObject.getSchema());
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(jsc.sc()).getOrCreate();
+                       return 
FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, binaryBlockFrame, 
mc, frameObject.getSchema());
                } 
                catch (DMLRuntimeException e) {
                        throw new MLContextException("DMLRuntimeException while 
converting frame object to DataFrame", e);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/api/python/SystemML.py
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/api/python/SystemML.py 
b/src/main/java/org/apache/sysml/api/python/SystemML.py
index 3b8ae96..b22c570 100644
--- a/src/main/java/org/apache/sysml/api/python/SystemML.py
+++ b/src/main/java/org/apache/sysml/api/python/SystemML.py
@@ -25,7 +25,7 @@ from py4j.protocol import Py4JJavaError, Py4JError
 import traceback
 import os
 from pyspark.context import SparkContext 
-from pyspark.sql import DataFrame, SQLContext
+from pyspark.sql import DataFrame, SparkSession
 from pyspark.rdd import RDD
 
 
@@ -59,7 +59,7 @@ class MLContext(object):
             setForcedSparkExecType = (args[1] if len(args) > 1 else False)
             self.sc = sc
             self.ml = sc._jvm.org.apache.sysml.api.MLContext(sc._jsc, 
monitorPerformance, setForcedSparkExecType)
-            self.sqlCtx = SQLContext(sc)
+            self.sparkSession = SparkSession.builder.getOrCreate()
         except Py4JError:
             traceback.print_exc()
 
@@ -212,41 +212,21 @@ class MLOutput(object):
 
     def getBinaryBlockedRDD(self, varName):
         raise Exception('Not supported in Python MLContext')
-        #try:
-        #    rdd = RDD(self.jmlOut.getBinaryBlockedRDD(varName), self.sc)
-        #    return rdd
-        #except Py4JJavaError:
-        #    traceback.print_exc()
 
     def getMatrixCharacteristics(self, varName):
         raise Exception('Not supported in Python MLContext')
-        #try:
-        #    chars = self.jmlOut.getMatrixCharacteristics(varName)
-        #    return chars
-        #except Py4JJavaError:
-        #    traceback.print_exc()
 
-    def getDF(self, sqlContext, varName):
+    def getDF(self, sparkSession, varName):
         try:
-            jdf = self.jmlOut.getDF(sqlContext._ssql_ctx, varName)
-            df = DataFrame(jdf, sqlContext)
+            jdf = self.jmlOut.getDF(sparkSession, varName)
+            df = DataFrame(jdf, sparkSession)
             return df
         except Py4JJavaError:
             traceback.print_exc()
-        
-    def getMLMatrix(self, sqlContext, varName):
+
+    def getMLMatrix(self, sparkSession, varName):
         raise Exception('Not supported in Python MLContext')
-        #try:
-        #    mlm = self.jmlOut.getMLMatrix(sqlContext._scala_SQLContext, 
varName)
-        #    return mlm
-        #except Py4JJavaError:
-        #    traceback.print_exc()
 
     def getStringRDD(self, varName, format):
         raise Exception('Not supported in Python MLContext')
-        #try:
-        #    rdd = RDD(self.jmlOut.getStringRDD(varName, format), self.sc)
-        #    return rdd
-        #except Py4JJavaError:
-        #    traceback.print_exc()
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
index ae3b686..3d5df56 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/FrameRDDConverterUtils.java
@@ -22,9 +22,9 @@ package org.apache.sysml.runtime.instructions.spark.utils;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,14 +42,11 @@ import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-
-import scala.Tuple2;
-
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
@@ -70,6 +67,8 @@ import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+import scala.Tuple2;
+
 
 
 public class FrameRDDConverterUtils 
@@ -267,7 +266,7 @@ public class FrameRDDConverterUtils
                                new DataFrameToBinaryBlockFunction(mc, 
colnames, fschema, containsID, colVect));
        }
 
-       public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, 
JavaPairRDD<Long,FrameBlock> in, 
+       public static Dataset<Row> binaryBlockToDataFrame(SparkSession 
sparkSession, JavaPairRDD<Long,FrameBlock> in, 
                        MatrixCharacteristics mc, ValueType[] schema)
        {
                if( !mc.colsKnown() )
@@ -283,7 +282,7 @@ public class FrameRDDConverterUtils
                StructType dfSchema = convertFrameSchemaToDFSchema(schema, 
true);
        
                //rdd to data frame conversion
-               return sqlctx.createDataFrame(rowRDD, dfSchema);
+               return sparkSession.createDataFrame(rowRDD, dfSchema);
        }
        
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 1310b80..4d95b95 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -34,21 +34,18 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.ml.feature.LabeledPoint;
 import org.apache.spark.ml.linalg.DenseVector;
 import org.apache.spark.ml.linalg.Vector;
 import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.ml.linalg.Vectors;
-import org.apache.spark.ml.feature.LabeledPoint;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.util.LongAccumulator;
-
-import scala.Tuple2;
-
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
@@ -68,6 +65,8 @@ import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.runtime.util.FastStringTokenizer;
 import org.apache.sysml.runtime.util.UtilFunctions;
 
+import scala.Tuple2;
+
 public class RDDConverterUtils 
 {
        public static final String DF_ID_COLUMN = "__INDEX";
@@ -262,7 +261,7 @@ public class RDDConverterUtils
                return out;
        }
 
-       public static Dataset<Row> binaryBlockToDataFrame(SQLContext sqlctx, 
+       public static Dataset<Row> binaryBlockToDataFrame(SparkSession 
sparkSession,
                        JavaPairRDD<MatrixIndexes, MatrixBlock> in, 
MatrixCharacteristics mc, boolean toVector)  
        {
                if( !mc.colsKnown() )
@@ -284,7 +283,7 @@ public class RDDConverterUtils
                }
                
                //rdd to data frame conversion
-               return sqlctx.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
+               return sparkSession.createDataFrame(rowsRDD.rdd(), 
DataTypes.createStructType(fields));
        }
 
        public static JavaPairRDD<LongWritable, Text> 
stringToSerializableText(JavaPairRDD<Long,String> in)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
index e3b4d0c..f4a02dd 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtilsExt.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
 
 import org.apache.hadoop.io.Text;
 import org.apache.spark.SparkContext;
-import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -44,6 +43,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -221,11 +221,11 @@ public class RDDConverterUtilsExt
         * Add element indices as new column to DataFrame
         * 
         * @param df input data frame
-        * @param sqlContext SQL context
+        * @param sparkSession the Spark Session
         * @param nameOfCol name of index column
         * @return new data frame
         */
-       public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, SQLContext 
sqlContext, String nameOfCol) {
+       public static Dataset<Row> addIDToDataFrame(Dataset<Row> df, 
SparkSession sparkSession, String nameOfCol) {
                StructField[] oldSchema = df.schema().fields();
                StructField[] newSchema = new StructField[oldSchema.length + 1];
                for(int i = 0; i < oldSchema.length; i++) {
@@ -234,7 +234,7 @@ public class RDDConverterUtilsExt
                newSchema[oldSchema.length] = 
DataTypes.createStructField(nameOfCol, DataTypes.DoubleType, false);
                // JavaRDD<Row> newRows = df.rdd().toJavaRDD().map(new 
AddRowID());
                JavaRDD<Row> newRows = 
df.rdd().toJavaRDD().zipWithIndex().map(new AddRowID());
-               return sqlContext.createDataFrame(newRows, new 
StructType(newSchema));
+               return sparkSession.createDataFrame(newRows, new 
StructType(newSchema));
        }
        
        
@@ -378,9 +378,42 @@ public class RDDConverterUtilsExt
         * @return dataframe of ml.linalg.Vector rows
         * @throws DMLRuntimeException
         *             if DMLRuntimeException occurs
+        *             
+        * @deprecated This will be removed in SystemML 1.0. Please migrate to 
{@code
+        * RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(SparkSession, 
Dataset<Row>) }
         */
+       @Deprecated
        public static Dataset<Row> stringDataFrameToVectorDataFrame(SQLContext 
sqlContext, Dataset<Row> inputDF)
                        throws DMLRuntimeException {
+               SparkSession sparkSession = sqlContext.sparkSession();
+               return stringDataFrameToVectorDataFrame(sparkSession, inputDF);
+       }
+       
+       /**
+        * Convert a dataframe of comma-separated string rows to a dataframe of
+        * ml.linalg.Vector rows.
+        * 
+        * <p>
+        * Example input rows:<br>
+        * 
+        * <code>
+        * ((1.2, 4.3, 3.4))<br>
+        * (1.2, 3.4, 2.2)<br>
+        * [[1.2, 34.3, 1.2, 1.25]]<br>
+        * [1.2, 3.4]<br>
+        * </code>
+        * 
+        * @param sparkSession
+        *            Spark Session
+        * @param inputDF
+        *            dataframe of comma-separated row strings to convert to
+        *            dataframe of ml.linalg.Vector rows
+        * @return dataframe of ml.linalg.Vector rows
+        * @throws DMLRuntimeException
+        *             if DMLRuntimeException occurs
+        */
+       public static Dataset<Row> 
stringDataFrameToVectorDataFrame(SparkSession sparkSession, Dataset<Row> 
inputDF)
+                       throws DMLRuntimeException {
 
                StructField[] oldSchema = inputDF.schema().fields();
                StructField[] newSchema = new StructField[oldSchema.length];
@@ -444,8 +477,7 @@ public class RDDConverterUtilsExt
 
                // output DF
                JavaRDD<Row> newRows = 
inputDF.rdd().toJavaRDD().zipWithIndex().map(new StringToVector());
-               Dataset<Row> outDF = sqlContext.createDataFrame(newRows.rdd(), 
DataTypes.createStructType(newSchema));
-
+               Dataset<Row> outDF = 
sparkSession.createDataFrame(newRows.rdd(), 
DataTypes.createStructType(newSchema));
                return outDF;
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/converters.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/converters.py 
b/src/main/python/systemml/converters.py
index c7ca8ef..2d32508 100644
--- a/src/main/python/systemml/converters.py
+++ b/src/main/python/systemml/converters.py
@@ -36,7 +36,7 @@ def getNumCols(numPyArr):
         return numPyArr.shape[1]
 
 
-def convertToLabeledDF(sqlCtx, X, y=None):
+def convertToLabeledDF(sparkSession, X, y=None):
     from pyspark.ml.feature import VectorAssembler
     if y is not None:
         pd1 = pd.DataFrame(X)
@@ -49,7 +49,7 @@ def convertToLabeledDF(sqlCtx, X, y=None):
         inputColumns = ['C' + str(i) for i in pdf.columns]
         outputColumns = inputColumns
     assembler = VectorAssembler(inputCols=inputColumns, outputCol='features')
-    out = assembler.transform(sqlCtx.createDataFrame(pdf, outputColumns))
+    out = assembler.transform(sparkSession.createDataFrame(pdf, outputColumns))
     if y is not None:
         return out.select('features', 'label')
     else:

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/defmatrix.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/defmatrix.py 
b/src/main/python/systemml/defmatrix.py
index 5f973f6..3e13bf2 100644
--- a/src/main/python/systemml/defmatrix.py
+++ b/src/main/python/systemml/defmatrix.py
@@ -28,7 +28,7 @@ try:
     import py4j.java_gateway
     from py4j.java_gateway import JavaObject
     from pyspark import SparkContext
-    from pyspark.sql import DataFrame, SQLContext
+    from pyspark.sql import DataFrame, SparkSession
     import pyspark.mllib.common
 except ImportError:
     raise ImportError('Unable to import `pyspark`. Hint: Make sure you are 
running with PySpark.')
@@ -46,7 +46,7 @@ def setSparkContext(sc):
         SparkContext
     """
     matrix.sc = sc
-    matrix.sqlContext = SQLContext(sc)
+    matrix.sparkSession = SparkSession.builder.getOrCreate()
     matrix.ml = MLContext(matrix.sc)
 
 
@@ -290,7 +290,7 @@ def solve(A, b):
     >>> import numpy as np
     >>> from sklearn import datasets
     >>> import SystemML as sml
-    >>> from pyspark.sql import SQLContext
+    >>> from pyspark.sql import SparkSession
     >>> diabetes = datasets.load_diabetes()
     >>> diabetes_X = diabetes.data[:, np.newaxis, 2]
     >>> X_train = diabetes_X[:-20]
@@ -523,7 +523,7 @@ class matrix(object):
         if isinstance(self.eval_data, Matrix):
             self.eval_data = self.eval_data.toDF()
             return self.eval_data
-        self.eval_data = matrix.sqlContext.createDataFrame(self.toPandas())
+        self.eval_data = matrix.sparkSession.createDataFrame(self.toPandas())
         return self.eval_data
 
     def save(self, file, format='csv'):

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/systemml/mllearn/estimators.py
----------------------------------------------------------------------
diff --git a/src/main/python/systemml/mllearn/estimators.py 
b/src/main/python/systemml/mllearn/estimators.py
index d4ece89..c4eaf3d 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -37,8 +37,8 @@ import math
 from ..converters import *
 from ..classloader import *
 
-def assemble(sqlCtx, pdf, inputCols, outputCol):
-    tmpDF = sqlCtx.createDataFrame(pdf, list(pdf.columns))
+def assemble(sparkSession, pdf, inputCols, outputCol):
+    tmpDF = sparkSession.createDataFrame(pdf, list(pdf.columns))
     assembler = VectorAssembler(inputCols=list(inputCols), outputCol=outputCol)
     return assembler.transform(tmpDF)
 
@@ -129,7 +129,7 @@ class BaseSystemMLEstimator(Estimator):
                     raise Exception('Number of rows of X and y should match')
                 colNames = pdfX.columns
                 pdfX[self.label_col] = pdfY[pdfY.columns[0]]
-                df = assemble(self.sqlCtx, pdfX, colNames, 
self.features_col).select(self.features_col, self.label_col)
+                df = assemble(self.sparkSession, pdfX, colNames, 
self.features_col).select(self.features_col, self.label_col)
                 self.fit_df(df)
             else:
                 numColsy = getNumCols(y)
@@ -157,9 +157,9 @@ class BaseSystemMLEstimator(Estimator):
         if isinstance(X, SUPPORTED_TYPES):
             if self.transferUsingDF:
                 pdfX = convertToPandasDF(X)
-                df = assemble(self.sqlCtx, pdfX, pdfX.columns, 
self.features_col).select(self.features_col)
+                df = assemble(self.sparkSession, pdfX, pdfX.columns, 
self.features_col).select(self.features_col)
                 retjDF = self.model.transform(df._jdf)
-                retDF = DataFrame(retjDF, self.sqlCtx)
+                retDF = DataFrame(retjDF, self.sparkSession)
                 retPDF = retDF.sort('__INDEX').select('prediction').toPandas()
                 if isinstance(X, np.ndarray):
                     return self.decode(retPDF.as_matrix().flatten())
@@ -182,7 +182,7 @@ class BaseSystemMLEstimator(Estimator):
                 assembler = VectorAssembler(inputCols=X.columns, 
outputCol=self.features_col)
                 df = assembler.transform(X)
             retjDF = self.model.transform(df._jdf)
-            retDF = DataFrame(retjDF, self.sqlCtx)
+            retDF = DataFrame(retjDF, self.sparkSession)
             # Return DF
             return retDF.sort('__INDEX')
         else:
@@ -245,8 +245,8 @@ class LogisticRegression(BaseSystemMLClassifier):
     
     >>> from sklearn import datasets, neighbors
     >>> from systemml.mllearn import LogisticRegression
-    >>> from pyspark.sql import SQLContext
-    >>> sqlCtx = SQLContext(sc)
+    >>> from pyspark.sql import SparkSession
+    >>> sparkSession = SparkSession.builder.getOrCreate()
     >>> digits = datasets.load_digits()
     >>> X_digits = digits.data
     >>> y_digits = digits.target + 1
@@ -255,7 +255,7 @@ class LogisticRegression(BaseSystemMLClassifier):
     >>> y_train = y_digits[:.9 * n_samples]
     >>> X_test = X_digits[.9 * n_samples:]
     >>> y_test = y_digits[.9 * n_samples:]
-    >>> logistic = LogisticRegression(sqlCtx)
+    >>> logistic = LogisticRegression(sparkSession)
     >>> print('LogisticRegression score: %f' % logistic.fit(X_train, 
y_train).score(X_test, y_test))
     
     MLPipeline way
@@ -263,9 +263,9 @@ class LogisticRegression(BaseSystemMLClassifier):
     >>> from pyspark.ml import Pipeline
     >>> from systemml.mllearn import LogisticRegression
     >>> from pyspark.ml.feature import HashingTF, Tokenizer
-    >>> from pyspark.sql import SQLContext
-    >>> sqlCtx = SQLContext(sc)
-    >>> training = sqlCtx.createDataFrame([
+    >>> from pyspark.sql import SparkSession
+    >>> sparkSession = SparkSession.builder.getOrCreate()
+    >>> training = sparkSession.createDataFrame([
     >>>     (0L, "a b c d e spark", 1.0),
     >>>     (1L, "b d", 2.0),
     >>>     (2L, "spark f g h", 1.0),
@@ -281,10 +281,10 @@ class LogisticRegression(BaseSystemMLClassifier):
     >>> ], ["id", "text", "label"])
     >>> tokenizer = Tokenizer(inputCol="text", outputCol="words")
     >>> hashingTF = HashingTF(inputCol="words", outputCol="features", 
numFeatures=20)
-    >>> lr = LogisticRegression(sqlCtx)
+    >>> lr = LogisticRegression(sparkSession)
     >>> pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
     >>> model = pipeline.fit(training)
-    >>> test = sqlCtx.createDataFrame([
+    >>> test = sparkSession.createDataFrame([
     >>>     (12L, "spark i j k"),
     >>>     (13L, "l m n"),
     >>>     (14L, "mapreduce spark"),
@@ -294,13 +294,13 @@ class LogisticRegression(BaseSystemMLClassifier):
     
     """
     
-    def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, 
max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', 
transferUsingDF=False):
+    def __init__(self, sparkSession, penalty='l2', fit_intercept=True, 
max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', 
transferUsingDF=False):
         """
         Performs both binomial and multinomial logistic regression.
         
         Parameters
         ----------
-        sqlCtx: PySpark SQLContext
+        sparkSession: PySpark SparkSession
         penalty: Only 'l2' supported
         fit_intercept: Specifies whether to add intercept or not (default: 
True)
         max_iter: Maximum number of outer (Fisher scoring) iterations 
(default: 100)
@@ -309,8 +309,8 @@ class LogisticRegression(BaseSystemMLClassifier):
         C: 1/regularization parameter (default: 1.0)
         solver: Only 'newton-cg' solver supported
         """
-        self.sqlCtx = sqlCtx
-        self.sc = sqlCtx._sc
+        self.sparkSession = sparkSession
+        self.sc = sparkSession._sc
         createJavaObject(self.sc, 'dummy')
         self.uid = "logReg"
         self.estimator = 
self.sc._jvm.org.apache.sysml.api.ml.LogisticRegression(self.uid, 
self.sc._jsc.sc())
@@ -340,7 +340,7 @@ class LinearRegression(BaseSystemMLRegressor):
     >>> import numpy as np
     >>> from sklearn import datasets
     >>> from systemml.mllearn import LinearRegression
-    >>> from pyspark.sql import SQLContext
+    >>> from pyspark.sql import SparkSession
     >>> # Load the diabetes dataset
     >>> diabetes = datasets.load_diabetes()
     >>> # Use only one feature
@@ -352,7 +352,7 @@ class LinearRegression(BaseSystemMLRegressor):
     >>> diabetes_y_train = diabetes.target[:-20]
     >>> diabetes_y_test = diabetes.target[-20:]
     >>> # Create linear regression object
-    >>> regr = LinearRegression(sqlCtx, solver='newton-cg')
+    >>> regr = LinearRegression(sparkSession, solver='newton-cg')
     >>> # Train the model using the training sets
     >>> regr.fit(diabetes_X_train, diabetes_y_train)
     >>> # The mean square error
@@ -361,13 +361,13 @@ class LinearRegression(BaseSystemMLRegressor):
     """
     
     
-    def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, 
C=1.0, solver='newton-cg', transferUsingDF=False):
+    def __init__(self, sparkSession, fit_intercept=True, max_iter=100, 
tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
         """
         Performs linear regression to model the relationship between one 
numerical response variable and one or more explanatory (feature) variables.
 
         Parameters
         ----------
-        sqlCtx: PySpark SQLContext
+        sparkSession: PySpark SparkSession
         fit_intercept: Specifies whether to add intercept or not (default: 
True)
         max_iter: Maximum number of conjugate gradient iterations, or 0 if no 
maximum limit provided (default: 100)
         tol: Tolerance used in the convergence criterion (default: 0.000001)
@@ -377,8 +377,8 @@ class LinearRegression(BaseSystemMLRegressor):
         'direct-solve' solver is more efficient when the number of features is 
relatively small (m < 1000) and
         input matrix X is either tall or fairly dense; otherwise 'newton-cg' 
solver is more efficient.
         """
-        self.sqlCtx = sqlCtx
-        self.sc = sqlCtx._sc
+        self.sparkSession = sparkSession
+        self.sc = sparkSession._sc
         createJavaObject(self.sc, 'dummy')
         self.uid = "lr"
         if solver == 'newton-cg' or solver == 'direct-solve':
@@ -405,8 +405,8 @@ class SVM(BaseSystemMLClassifier):
     
     >>> from sklearn import datasets, neighbors
     >>> from systemml.mllearn import SVM
-    >>> from pyspark.sql import SQLContext
-    >>> sqlCtx = SQLContext(sc)
+    >>> from pyspark.sql import SparkSession
+    >>> sparkSession = SparkSession.builder.getOrCreate()
     >>> digits = datasets.load_digits()
     >>> X_digits = digits.data
     >>> y_digits = digits.target 
@@ -415,27 +415,27 @@ class SVM(BaseSystemMLClassifier):
     >>> y_train = y_digits[:.9 * n_samples]
     >>> X_test = X_digits[.9 * n_samples:]
     >>> y_test = y_digits[.9 * n_samples:]
-    >>> svm = SVM(sqlCtx, is_multi_class=True)
+    >>> svm = SVM(sparkSession, is_multi_class=True)
     >>> print('LogisticRegression score: %f' % svm.fit(X_train, 
y_train).score(X_test, y_test))
      
     """
 
 
-    def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, 
C=1.0, is_multi_class=False, transferUsingDF=False):
+    def __init__(self, sparkSession, fit_intercept=True, max_iter=100, 
tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False):
         """
         Performs both binary-class and multiclass SVM (Support Vector 
Machines).
 
         Parameters
         ----------
-        sqlCtx: PySpark SQLContext
+        sparkSession: PySpark SparkSession
         fit_intercept: Specifies whether to add intercept or not (default: 
True)
         max_iter: Maximum number iterations (default: 100)
         tol: Tolerance used in the convergence criterion (default: 0.000001)
         C: 1/regularization parameter (default: 1.0)
         is_multi_class: Specifies whether to use binary-class SVM or 
multi-class SVM algorithm (default: False)
         """
-        self.sqlCtx = sqlCtx
-        self.sc = sqlCtx._sc
+        self.sparkSession = sparkSession
+        self.sc = sparkSession._sc
         self.uid = "svm"
         createJavaObject(self.sc, 'dummy')
         self.estimator = self.sc._jvm.org.apache.sysml.api.ml.SVM(self.uid, 
self.sc._jsc.sc(), is_multi_class)
@@ -461,8 +461,8 @@ class NaiveBayes(BaseSystemMLClassifier):
     >>> from sklearn.feature_extraction.text import TfidfVectorizer
     >>> from systemml.mllearn import NaiveBayes
     >>> from sklearn import metrics
-    >>> from pyspark.sql import SQLContext
-    >>> sqlCtx = SQLContext(sc)
+    >>> from pyspark.sql import SparkSession
+    >>> sparkSession = SparkSession.builder.getOrCreate(sc)
     >>> categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 
'sci.space']
     >>> newsgroups_train = fetch_20newsgroups(subset='train', 
categories=categories)
     >>> newsgroups_test = fetch_20newsgroups(subset='test', 
categories=categories)
@@ -470,24 +470,24 @@ class NaiveBayes(BaseSystemMLClassifier):
     >>> # Both vectors and vectors_test are SciPy CSR matrix
     >>> vectors = vectorizer.fit_transform(newsgroups_train.data)
     >>> vectors_test = vectorizer.transform(newsgroups_test.data)
-    >>> nb = NaiveBayes(sqlCtx)
+    >>> nb = NaiveBayes(sparkSession)
     >>> nb.fit(vectors, newsgroups_train.target)
     >>> pred = nb.predict(vectors_test)
     >>> metrics.f1_score(newsgroups_test.target, pred, average='weighted')
 
     """
     
-    def __init__(self, sqlCtx, laplace=1.0, transferUsingDF=False):
+    def __init__(self, sparkSession, laplace=1.0, transferUsingDF=False):
         """
         Performs Naive Bayes.
 
         Parameters
         ----------
-        sqlCtx: PySpark SQLContext
+        sparkSession: PySpark SparkSession
         laplace: Laplace smoothing specified by the user to avoid creation of 
0 probabilities (default: 1.0)
         """
-        self.sqlCtx = sqlCtx
-        self.sc = sqlCtx._sc
+        self.sparkSession = sparkSession
+        self.sc = sparkSession._sc
         self.uid = "nb"
         createJavaObject(self.sc, 'dummy')
         self.estimator = 
self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc())

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/tests/test_mllearn_df.py
----------------------------------------------------------------------
diff --git a/src/main/python/tests/test_mllearn_df.py 
b/src/main/python/tests/test_mllearn_df.py
index 0d6a4b4..da49953 100644
--- a/src/main/python/tests/test_mllearn_df.py
+++ b/src/main/python/tests/test_mllearn_df.py
@@ -36,7 +36,7 @@ import numpy as np
 from pyspark.context import SparkContext
 from pyspark.ml import Pipeline
 from pyspark.ml.feature import HashingTF, Tokenizer
-from pyspark.sql import SQLContext
+from pyspark.sql import SparkSession
 from sklearn import datasets, metrics, neighbors
 from sklearn.datasets import fetch_20newsgroups
 from sklearn.feature_extraction.text import TfidfVectorizer
@@ -44,7 +44,7 @@ from sklearn.feature_extraction.text import TfidfVectorizer
 from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, 
SVM
 
 sc = SparkContext()
-sqlCtx = SQLContext(sc)
+sparkSession = SparkSession.builder.getOrCreate()
 
 # Currently not integrated with JUnit test
 # ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] 
--driver-class-path SystemML.jar test.py
@@ -60,7 +60,7 @@ class TestMLLearn(unittest.TestCase):
         X_test = X_digits[int(.9 * n_samples):]
         y_test = y_digits[int(.9 * n_samples):]
         # Convert to DataFrame for i/o: current way to transfer data
-        logistic = LogisticRegression(sqlCtx, transferUsingDF=True)
+        logistic = LogisticRegression(sparkSession, transferUsingDF=True)
         score = logistic.fit(X_train, y_train).score(X_test, y_test)
         self.failUnless(score > 0.9)
 
@@ -71,7 +71,7 @@ class TestMLLearn(unittest.TestCase):
         diabetes_X_test = diabetes_X[-20:]
         diabetes_y_train = diabetes.target[:-20]
         diabetes_y_test = diabetes.target[-20:]
-        regr = LinearRegression(sqlCtx, transferUsingDF=True)
+        regr = LinearRegression(sparkSession, transferUsingDF=True)
         regr.fit(diabetes_X_train, diabetes_y_train)
         score = regr.score(diabetes_X_test, diabetes_y_test)
         self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am 
using it incorrectly)
@@ -85,7 +85,7 @@ class TestMLLearn(unittest.TestCase):
         y_train = y_digits[:int(.9 * n_samples)]
         X_test = X_digits[int(.9 * n_samples):]
         y_test = y_digits[int(.9 * n_samples):]
-        svm = SVM(sqlCtx, is_multi_class=True, transferUsingDF=True)
+        svm = SVM(sparkSession, is_multi_class=True, transferUsingDF=True)
         score = svm.fit(X_train, y_train).score(X_test, y_test)
         self.failUnless(score > 0.9)
 
@@ -97,7 +97,7 @@ class TestMLLearn(unittest.TestCase):
     #    # Both vectors and vectors_test are SciPy CSR matrix
     #    vectors = vectorizer.fit_transform(newsgroups_train.data)
     #    vectors_test = vectorizer.transform(newsgroups_test.data)
-    #    nb = NaiveBayes(sqlCtx)
+    #    nb = NaiveBayes(sparkSession)
     #    nb.fit(vectors, newsgroups_train.target)
     #    pred = nb.predict(vectors_test)
     #    score = metrics.f1_score(newsgroups_test.target, pred, 
average='weighted')

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/python/tests/test_mllearn_numpy.py
----------------------------------------------------------------------
diff --git a/src/main/python/tests/test_mllearn_numpy.py 
b/src/main/python/tests/test_mllearn_numpy.py
index d030837..925554f 100644
--- a/src/main/python/tests/test_mllearn_numpy.py
+++ b/src/main/python/tests/test_mllearn_numpy.py
@@ -36,7 +36,7 @@ import numpy as np
 from pyspark.context import SparkContext
 from pyspark.ml import Pipeline
 from pyspark.ml.feature import HashingTF, Tokenizer
-from pyspark.sql import SQLContext
+from pyspark.sql import SparkSession
 from sklearn import datasets, metrics, neighbors
 from sklearn.datasets import fetch_20newsgroups
 from sklearn.feature_extraction.text import TfidfVectorizer
@@ -44,7 +44,7 @@ from sklearn.feature_extraction.text import TfidfVectorizer
 from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, 
SVM
 
 sc = SparkContext()
-sqlCtx = SQLContext(sc)
+sparkSession = SparkSession.builder.getOrCreate()
 
 # Currently not integrated with JUnit test
 # ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] 
--driver-class-path SystemML.jar test.py
@@ -58,12 +58,12 @@ class TestMLLearn(unittest.TestCase):
         y_train = y_digits[:int(.9 * n_samples)]
         X_test = X_digits[int(.9 * n_samples):]
         y_test = y_digits[int(.9 * n_samples):]
-        logistic = LogisticRegression(sqlCtx)
+        logistic = LogisticRegression(sparkSession)
         score = logistic.fit(X_train, y_train).score(X_test, y_test)
         self.failUnless(score > 0.9)
     
     def test_logistic_mlpipeline(self):
-        training = sqlCtx.createDataFrame([
+        training = sparkSession.createDataFrame([
             ("a b c d e spark", 1.0),
             ("b d", 2.0),
             ("spark f g h", 1.0),
@@ -79,10 +79,10 @@ class TestMLLearn(unittest.TestCase):
             ], ["text", "label"])
         tokenizer = Tokenizer(inputCol="text", outputCol="words")
         hashingTF = HashingTF(inputCol="words", outputCol="features", 
numFeatures=20)
-        lr = LogisticRegression(sqlCtx)
+        lr = LogisticRegression(sparkSession)
         pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
         model = pipeline.fit(training)
-        test = sqlCtx.createDataFrame([
+        test = sparkSession.createDataFrame([
             ("spark i j k", 1.0),
             ("l m n", 2.0),
             ("mapreduce spark", 1.0),
@@ -101,7 +101,7 @@ class TestMLLearn(unittest.TestCase):
         diabetes_X_test = diabetes_X[-20:]
         diabetes_y_train = diabetes.target[:-20]
         diabetes_y_test = diabetes.target[-20:]
-        regr = LinearRegression(sqlCtx)
+        regr = LinearRegression(sparkSession)
         regr.fit(diabetes_X_train, diabetes_y_train)
         score = regr.score(diabetes_X_test, diabetes_y_test)
         self.failUnless(score > 0.4) # TODO: Improve r2-score (may be I am 
using it incorrectly)
@@ -115,7 +115,7 @@ class TestMLLearn(unittest.TestCase):
         y_train = y_digits[:int(.9 * n_samples)]
         X_test = X_digits[int(.9 * n_samples):]
         y_test = y_digits[int(.9 * n_samples):]
-        svm = SVM(sqlCtx, is_multi_class=True)
+        svm = SVM(sparkSession, is_multi_class=True)
         score = svm.fit(X_train, y_train).score(X_test, y_test)
         self.failUnless(score > 0.9)
 
@@ -128,7 +128,7 @@ class TestMLLearn(unittest.TestCase):
         y_train = y_digits[:int(.9 * n_samples)]
         X_test = X_digits[int(.9 * n_samples):]
         y_test = y_digits[int(.9 * n_samples):]
-        nb = NaiveBayes(sqlCtx)
+        nb = NaiveBayes(sparkSession)
         score = nb.fit(X_train, y_train).score(X_test, y_test)
         self.failUnless(score > 0.8)
         
@@ -140,7 +140,7 @@ class TestMLLearn(unittest.TestCase):
     #    # Both vectors and vectors_test are SciPy CSR matrix
     #    vectors = vectorizer.fit_transform(newsgroups_train.data)
     #    vectors_test = vectorizer.transform(newsgroups_test.data)
-    #    nb = NaiveBayes(sqlCtx)
+    #    nb = NaiveBayes(sparkSession)
     #    nb.fit(vectors, newsgroups_train.target)
     #    pred = nb.predict(vectors_test)
     #    score = metrics.f1_score(newsgroups_test.target, pred, 
average='weighted')

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
----------------------------------------------------------------------
diff --git 
a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala 
b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
index 1145463..fb9697d 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
@@ -152,11 +152,11 @@ trait BaseSystemMLClassifierModel extends 
BaseSystemMLEstimatorModel {
       
     if(outputProb) {
       val prob = modelPredict.getDataFrame(probVar, 
true).withColumnRenamed("C1", 
"probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability")
-      val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sqlContext, RDDConverterUtils.DF_ID_COLUMN)
+      val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
       return PredictionUtils.joinUsingID(dataset, 
PredictionUtils.joinUsingID(prob, predictedDF))
     }
     else {
-      val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sqlContext, RDDConverterUtils.DF_ID_COLUMN)
+      val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
       return PredictionUtils.joinUsingID(dataset, predictedDF)
     }
     

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala 
b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
index c47fb3c..08154bb 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
@@ -80,7 +80,7 @@ trait BaseSystemMLRegressorModel extends 
BaseSystemMLEstimatorModel {
     val Xin_bin = new BinaryBlockMatrix(Xin, mcXin)
     val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
     val predictedDF = 
modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, 
"C1").withColumnRenamed("C1", "prediction")
-    val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sqlContext, RDDConverterUtils.DF_ID_COLUMN)
+    val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
     return PredictionUtils.joinUsingID(dataset, predictedDF)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala 
b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
index c0e3f35..ce89502 100644
--- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala
@@ -111,16 +111,16 @@ class LogisticRegressionModel(override val uid: String)(
  */
 object LogisticRegressionExample {
   import org.apache.spark.{ SparkConf, SparkContext }
+  import org.apache.spark.sql._
   import org.apache.spark.sql.types._
   import org.apache.spark.ml.linalg.Vectors
   import org.apache.spark.ml.feature.LabeledPoint
 
   def main(args: Array[String]) = {
-    val sparkConf: SparkConf = new SparkConf();
-    val sc: SparkContext = new SparkContext("local", "TestLocal", sparkConf);
-    val sqlContext = new org.apache.spark.sql.SQLContext(sc);
+    val sparkSession = 
SparkSession.builder().master("local").appName("TestLocal").getOrCreate();
+    val sc: SparkContext = sparkSession.sparkContext;
 
-    import sqlContext.implicits._
+    import sparkSession.implicits._
     val training = sc.parallelize(Seq(
       LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),
       LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)),
@@ -130,7 +130,7 @@ object LogisticRegressionExample {
       LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3))))
     val lr = new LogisticRegression("log", sc)
     val lrmodel = lr.fit(training.toDF)
-    // lrmodel.mloutput.getDF(sqlContext, "B_out").show()
+    // lrmodel.mloutput.getDF(sparkSession, "B_out").show()
 
     val testing = sc.parallelize(Seq(
       LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)),

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java 
b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
index 5b25c71..e5ed921 100644
--- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
@@ -719,13 +719,11 @@ public abstract class AutomatedTestBase
                TestUtils.writeTestScalar(baseDirectory + EXPECTED_DIR + 
cacheDir + name, value);
                expectedFiles.add(baseDirectory + EXPECTED_DIR + cacheDir + 
name);
        }
-       
-       @SuppressWarnings("deprecation")
+
        protected static HashMap<CellIndex, Double> 
readDMLMatrixFromHDFS(String fileName) {
                return TestUtils.readDMLMatrixFromHDFS(baseDirectory + 
OUTPUT_DIR + fileName);
        }
 
-       @SuppressWarnings("deprecation")
        public HashMap<CellIndex, Double> readRMatrixFromFS(String fileName) {
                System.out.println("R script out: " + baseDirectory + 
EXPECTED_DIR + cacheDir + fileName);
                return TestUtils.readRMatrixFromFS(baseDirectory + EXPECTED_DIR 
+ cacheDir + fileName);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
 
b/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
index 7a69423..8ca1d8d 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/conversion/RDDConverterUtilsExtTest.java
@@ -32,7 +32,7 @@ import org.apache.spark.api.java.function.Function;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -85,12 +85,12 @@ public class RDDConverterUtilsExtTest extends 
AutomatedTestBase {
                list.add("[1.2, 3.4]");
                JavaRDD<String> javaRddString = sc.parallelize(list);
                JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
-               SQLContext sqlContext = new SQLContext(sc);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                List<StructField> fields = new ArrayList<StructField>();
                fields.add(DataTypes.createStructField("C1", 
DataTypes.StringType, true));
                StructType schema = DataTypes.createStructType(fields);
-               Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, 
schema);
-               Dataset<Row> outDF = 
RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF);
+               Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, 
schema);
+               Dataset<Row> outDF = 
RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
 
                List<String> expectedResults = new ArrayList<String>();
                expectedResults.add("[[1.2,4.3,3.4]]");
@@ -111,12 +111,12 @@ public class RDDConverterUtilsExtTest extends 
AutomatedTestBase {
                list.add(null);
                JavaRDD<String> javaRddString = sc.parallelize(list);
                JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
-               SQLContext sqlContext = new SQLContext(sc);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                List<StructField> fields = new ArrayList<StructField>();
                fields.add(DataTypes.createStructField("C1", 
DataTypes.StringType, true));
                StructType schema = DataTypes.createStructType(fields);
-               Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, 
schema);
-               Dataset<Row> outDF = 
RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF);
+               Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, 
schema);
+               Dataset<Row> outDF = 
RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
 
                List<String> expectedResults = new ArrayList<String>();
                expectedResults.add("[[1.2,3.4]]");
@@ -134,12 +134,12 @@ public class RDDConverterUtilsExtTest extends 
AutomatedTestBase {
                list.add("[cheeseburger,fries]");
                JavaRDD<String> javaRddString = sc.parallelize(list);
                JavaRDD<Row> javaRddRow = javaRddString.map(new StringToRow());
-               SQLContext sqlContext = new SQLContext(sc);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                List<StructField> fields = new ArrayList<StructField>();
                fields.add(DataTypes.createStructField("C1", 
DataTypes.StringType, true));
                StructType schema = DataTypes.createStructType(fields);
-               Dataset<Row> inDF = sqlContext.createDataFrame(javaRddRow, 
schema);
-               Dataset<Row> outDF = 
RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sqlContext, inDF);
+               Dataset<Row> inDF = sparkSession.createDataFrame(javaRddRow, 
schema);
+               Dataset<Row> outDF = 
RDDConverterUtilsExt.stringDataFrameToVectorDataFrame(sparkSession, inDF);
                // trigger evaluation to throw exception
                outDF.collectAsList();
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
index d5edf01..f626813 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/frame/FrameConverterTest.java
@@ -32,7 +32,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
@@ -532,10 +532,10 @@ public class FrameConverterTest extends AutomatedTestBase
                                OutputInfo oinfo = 
OutputInfo.BinaryBlockOutputInfo;
 
                                //Create DataFrame 
-                               SQLContext sqlContext = new SQLContext(sc);
+                               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                                StructType dfSchema = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(lschema, false);
                                JavaRDD<Row> rowRDD = 
FrameRDDConverterUtils.csvToRowRDD(sc, fnameIn, separator, lschema);
-                               Dataset<Row> df = 
sqlContext.createDataFrame(rowRDD, dfSchema);
+                               Dataset<Row> df = 
sparkSession.createDataFrame(rowRDD, dfSchema);
                                
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils
                                                .dataFrameToBinaryBlock(sc, df, 
mc, false/*, columns*/)
@@ -549,7 +549,8 @@ public class FrameConverterTest extends AutomatedTestBase
                                JavaPairRDD<Long, FrameBlock> rddIn = sc
                                                .hadoopFile(fnameIn, 
iinfo.inputFormatClass, LongWritable.class, FrameBlock.class)
                                                .mapToPair(new 
LongWritableFrameToLongFrameFunction());
-                               Dataset<Row> df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(new SQLContext(sc), rddIn, mc, 
lschema);
+                               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+                               Dataset<Row> df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, rddIn, mc, lschema);
                                
                                //Convert back DataFrame to binary block for 
comparison using original binary to converted DF and back to binary 
                                JavaPairRDD<LongWritable, FrameBlock> rddOut = 
FrameRDDConverterUtils

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
index d3fdc2a..199100e 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameMatrixConversionTest.java
@@ -23,8 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
+import org.apache.spark.sql.SparkSession;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -38,6 +37,7 @@ import org.apache.sysml.runtime.util.DataConverter;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
 
 
 public class DataFrameMatrixConversionTest extends AutomatedTestBase 
@@ -191,13 +191,13 @@ public class DataFrameMatrixConversionTest extends 
AutomatedTestBase
                        //setup spark context
                        sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
                        JavaSparkContext sc = sec.getSparkContext();
-                       SQLContext sqlctx = new SQLContext(sc);
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                        
                        //get binary block input rdd
                        JavaPairRDD<MatrixIndexes,MatrixBlock> in = 
SparkExecutionContext.toMatrixJavaPairRDD(sc, mbA, blksz, blksz);
                        
                        //matrix - dataframe - matrix conversion
-                       Dataset<Row> df = 
RDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, vector);
+                       Dataset<Row> df = 
RDDConverterUtils.binaryBlockToDataFrame(sparkSession, in, mc1, vector);
                        df = ( rows==rows3 ) ? df.repartition(rows) : df;
                        JavaPairRDD<MatrixIndexes,MatrixBlock> out = 
RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true, vector);
                        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
index 0e826a3..09628e5 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameRowFrameConversionTest.java
@@ -23,8 +23,7 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
-import org.junit.Test;
+import org.apache.spark.sql.SparkSession;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -40,6 +39,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
 
 
 public class DataFrameRowFrameConversionTest extends AutomatedTestBase 
@@ -216,15 +216,16 @@ public class DataFrameRowFrameConversionTest extends 
AutomatedTestBase
                        //setup spark context
                        sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
                        JavaSparkContext sc = sec.getSparkContext();
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
+
                        sc.getConf().set("spark.memory.offHeap.enabled", 
"false");
-                       SQLContext sqlctx = new SQLContext(sc);
-                       sqlctx.setConf("spark.sql.codegen.wholeStage", "false");
-                       
+                       sparkSession.conf().set("spark.sql.codegen.wholeStage", 
"false");
+
                        //get binary block input rdd
                        JavaPairRDD<Long,FrameBlock> in = 
SparkExecutionContext.toFrameJavaPairRDD(sc, fbA);
                        
                        //frame - dataframe - frame conversion
-                       Dataset<Row> df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(sqlctx, in, mc1, schema);
+                       Dataset<Row> df = 
FrameRDDConverterUtils.binaryBlockToDataFrame(sparkSession, in, mc1, schema);
                        JavaPairRDD<Long,FrameBlock> out = 
FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, true);
                        
                        //get output frame block

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
index c6d2251..4a73376 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorFrameConversionTest.java
@@ -30,12 +30,11 @@ import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.junit.Test;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.ConfigurationManager;
@@ -53,6 +52,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
 
 
 public class DataFrameVectorFrameConversionTest extends AutomatedTestBase 
@@ -268,10 +268,10 @@ public class DataFrameVectorFrameConversionTest extends 
AutomatedTestBase
                        //setup spark context
                        sec = (SparkExecutionContext) 
ExecutionContextFactory.createContext();          
                        JavaSparkContext sc = sec.getSparkContext();
-                       SQLContext sqlctx = new SQLContext(sc);
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                        
                        //create input data frame
-                       Dataset<Row> df = createDataFrame(sqlctx, mbA, 
containsID, schema);
+                       Dataset<Row> df = createDataFrame(sparkSession, mbA, 
containsID, schema);
                        
                        //dataframe - frame conversion
                        JavaPairRDD<Long,FrameBlock> out = 
FrameRDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc2, containsID);
@@ -294,17 +294,9 @@ public class DataFrameVectorFrameConversionTest extends 
AutomatedTestBase
                        DMLScript.rtplatform = oldPlatform;
                }
        }
-       
-       /**
-        * 
-        * @param sqlctx
-        * @param mb
-        * @param schema
-        * @return
-        * @throws DMLRuntimeException 
-        */
+
        @SuppressWarnings("resource")
-       private Dataset<Row> createDataFrame(SQLContext sqlctx, MatrixBlock mb, 
boolean containsID, ValueType[] schema) 
+       private Dataset<Row> createDataFrame(SparkSession sparkSession, 
MatrixBlock mb, boolean containsID, ValueType[] schema) 
                throws DMLRuntimeException
        {
                //create in-memory list of rows
@@ -350,8 +342,8 @@ public class DataFrameVectorFrameConversionTest extends 
AutomatedTestBase
                StructType dfSchema = DataTypes.createStructType(fields);
                                
                //create rdd and data frame
-               JavaSparkContext sc = new 
JavaSparkContext(sqlctx.sparkContext());
+               JavaSparkContext sc = new 
JavaSparkContext(sparkSession.sparkContext());
                JavaRDD<Row> rowRDD = sc.parallelize(list);
-               return sqlctx.createDataFrame(rowRDD, dfSchema);
+               return sparkSession.createDataFrame(rowRDD, dfSchema);
        }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
index 14ed4b7..92677b8 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/DataFrameVectorScriptTest.java
@@ -32,12 +32,11 @@ import org.apache.spark.ml.linalg.VectorUDT;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.junit.Test;
 import org.apache.sysml.api.mlcontext.FrameFormat;
 import org.apache.sysml.api.mlcontext.FrameMetadata;
 import org.apache.sysml.api.mlcontext.MLContext;
@@ -55,6 +54,7 @@ import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import org.apache.sysml.test.integration.TestConfiguration;
 import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
 
 
 public class DataFrameVectorScriptTest extends AutomatedTestBase 
@@ -269,10 +269,10 @@ public class DataFrameVectorScriptTest extends 
AutomatedTestBase
                        SparkConf conf = 
SparkExecutionContext.createSystemMLSparkConf()
                                        
.setAppName("MLContextFrameTest").setMaster("local");
                        sc = new JavaSparkContext(conf);
-                       SQLContext sqlctx = new SQLContext(sc);
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                        
                        //create input data frame
-                       Dataset<Row> df = createDataFrame(sqlctx, mbA, 
containsID, schema);
+                       Dataset<Row> df = createDataFrame(sparkSession, mbA, 
containsID, schema);
 
                        // Create full frame metadata, and empty frame metadata
                        FrameMetadata meta = new FrameMetadata(containsID ? 
FrameFormat.DF_WITH_INDEX :
@@ -315,17 +315,9 @@ public class DataFrameVectorScriptTest extends 
AutomatedTestBase
                                ml.close();
                }
        }
-       
-       /**
-        * 
-        * @param sqlctx
-        * @param mb
-        * @param schema
-        * @return
-        * @throws DMLRuntimeException 
-        */
+
        @SuppressWarnings("resource")
-       private Dataset<Row> createDataFrame(SQLContext sqlctx, MatrixBlock mb, 
boolean containsID, ValueType[] schema) 
+       private Dataset<Row> createDataFrame(SparkSession sparkSession, 
MatrixBlock mb, boolean containsID, ValueType[] schema) 
                throws DMLRuntimeException
        {
                //create in-memory list of rows
@@ -371,8 +363,8 @@ public class DataFrameVectorScriptTest extends 
AutomatedTestBase
                StructType dfSchema = DataTypes.createStructType(fields);
                                
                //create rdd and data frame
-               JavaSparkContext sc = new 
JavaSparkContext(sqlctx.sparkContext());
+               JavaSparkContext sc = new 
JavaSparkContext(sparkSession.sparkContext());
                JavaRDD<Row> rowRDD = sc.parallelize(list);
-               return sqlctx.createDataFrame(rowRDD, dfSchema);
+               return sparkSession.createDataFrame(rowRDD, dfSchema);
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
index e6a947f..d485c48 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/mlcontext/FrameTest.java
@@ -33,7 +33,7 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 import org.apache.sysml.api.DMLException;
 import org.apache.sysml.api.DMLScript;
@@ -68,7 +68,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 
-@SuppressWarnings("deprecation")
 public class FrameTest extends AutomatedTestBase 
 {
        private final static String TEST_DIR = "functions/frame/";
@@ -238,15 +237,16 @@ public class FrameTest extends AutomatedTestBase
                if(bFromDataFrame)
                {
                        //Create DataFrame for input A 
-                       SQLContext sqlContext = new SQLContext(sc);
+                       SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                        StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schema, false);
+
                        JavaRDD<Row> rowRDDA = 
FrameRDDConverterUtils.csvToRowRDD(sc, input("A"), 
DataExpression.DEFAULT_DELIM_DELIMITER, schema);
-                       dfA = sqlContext.createDataFrame(rowRDDA, dfSchemaA);
+                       dfA = sparkSession.createDataFrame(rowRDDA, dfSchemaA);
                        
                        //Create DataFrame for input B 
                        StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false);
                        JavaRDD<Row> rowRDDB = 
FrameRDDConverterUtils.csvToRowRDD(sc, input("B"), 
DataExpression.DEFAULT_DELIM_DELIMITER, schemaB);
-                       dfB = sqlContext.createDataFrame(rowRDDB, dfSchemaB);
+                       dfB = sparkSession.createDataFrame(rowRDDB, dfSchemaB);
                }
 
                try 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/80ab57bd/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
 
b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
index 0b3fac4..6dd74d3 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextFrameTest.java
@@ -34,7 +34,7 @@ import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
@@ -53,7 +53,6 @@ import 
org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysml.test.integration.AutomatedTestBase;
 import 
org.apache.sysml.test.integration.mlcontext.MLContextTest.CommaSeparatedValueStringToDoubleArrayRow;
-
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -239,11 +238,11 @@ public class MLContextFrameTest extends AutomatedTestBase 
{
                                JavaRDD<Row> javaRddRowB = 
FrameRDDConverterUtils.csvToRowRDD(sc, javaRDDB, CSV_DELIM, schemaB);
 
                                // Create DataFrame
-                               SQLContext sqlContext = new SQLContext(sc);
+                               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
                                StructType dfSchemaA = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaA, false);
-                               Dataset<Row> dataFrameA = 
sqlContext.createDataFrame(javaRddRowA, dfSchemaA);
+                               Dataset<Row> dataFrameA = 
sparkSession.createDataFrame(javaRddRowA, dfSchemaA);
                                StructType dfSchemaB = 
FrameRDDConverterUtils.convertFrameSchemaToDFSchema(schemaB, false);
-                               Dataset<Row> dataFrameB = 
sqlContext.createDataFrame(javaRddRowB, dfSchemaB);
+                               Dataset<Row> dataFrameB = 
sparkSession.createDataFrame(javaRddRowB, dfSchemaB);
                                if (script_type == SCRIPT_TYPE.DML)
                                        script = 
dml("A[2:3,2:4]=B;C=A[2:3,2:3]").in("A", dataFrameA, fmA).in("B", dataFrameB, 
fmB).out("A")
                                                        .out("C");
@@ -493,18 +492,18 @@ public class MLContextFrameTest extends AutomatedTestBase 
{
                JavaRDD<Row> javaRddRowA = 
FrameRDDConverterUtils.csvToRowRDD(sc, javaRddStringA, CSV_DELIM, schema);
                JavaRDD<Row> javaRddRowB = javaRddStringB.map(new 
CommaSeparatedValueStringToDoubleArrayRow());
 
-               SQLContext sqlContext = new SQLContext(sc);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
 
                List<StructField> fieldsA = new ArrayList<StructField>();
                fieldsA.add(DataTypes.createStructField("1", 
DataTypes.StringType, true));
                fieldsA.add(DataTypes.createStructField("2", 
DataTypes.DoubleType, true));
                StructType schemaA = DataTypes.createStructType(fieldsA);
-               Dataset<Row> dataFrameA = 
sqlContext.createDataFrame(javaRddRowA, schemaA);
+               Dataset<Row> dataFrameA = 
sparkSession.createDataFrame(javaRddRowA, schemaA);
 
                List<StructField> fieldsB = new ArrayList<StructField>();
                fieldsB.add(DataTypes.createStructField("1", 
DataTypes.DoubleType, true));
                StructType schemaB = DataTypes.createStructType(fieldsB);
-               Dataset<Row> dataFrameB = 
sqlContext.createDataFrame(javaRddRowB, schemaB);
+               Dataset<Row> dataFrameB = 
sparkSession.createDataFrame(javaRddRowB, schemaB);
 
                String dmlString = "[tA, tAM] = transformencode (target = A, 
spec = \"{ids: true ,recode: [ 1, 2 ]}\");\n"
                                + "C = tA %*% B;\n" + "M = s * C;";
@@ -530,14 +529,14 @@ public class MLContextFrameTest extends AutomatedTestBase 
{
 
                JavaRDD<Row> javaRddRowA = sc. parallelize( 
Arrays.asList(rowsA)); 
 
-               SQLContext sqlContext = new SQLContext(sc);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
 
                List<StructField> fieldsA = new ArrayList<StructField>();
                fieldsA.add(DataTypes.createStructField("myID", 
DataTypes.StringType, true));
                fieldsA.add(DataTypes.createStructField("FeatureName", 
DataTypes.StringType, true));
                fieldsA.add(DataTypes.createStructField("FeatureValue", 
DataTypes.IntegerType, true));
                StructType schemaA = DataTypes.createStructType(fieldsA);
-               Dataset<Row> dataFrameA = 
sqlContext.createDataFrame(javaRddRowA, schemaA);
+               Dataset<Row> dataFrameA = 
sparkSession.createDataFrame(javaRddRowA, schemaA);
 
                String dmlString = "[tA, tAM] = transformencode (target = A, 
spec = \"{ids: false ,recode: [ myID, FeatureName ]}\");";
 
@@ -572,14 +571,14 @@ public class MLContextFrameTest extends AutomatedTestBase 
{
 
                JavaRDD<Row> javaRddRowA = sc. parallelize( 
Arrays.asList(rowsA)); 
 
-               SQLContext sqlContext = new SQLContext(sc);
+               SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
 
                List<StructField> fieldsA = new ArrayList<StructField>();
                fieldsA.add(DataTypes.createStructField("featureName", 
DataTypes.StringType, true));
                fieldsA.add(DataTypes.createStructField("featureValue", 
DataTypes.IntegerType, true));
                fieldsA.add(DataTypes.createStructField("id", 
DataTypes.StringType, true));
                StructType schemaA = DataTypes.createStructType(fieldsA);
-               Dataset<Row> dataFrameA = 
sqlContext.createDataFrame(javaRddRowA, schemaA);
+               Dataset<Row> dataFrameA = 
sparkSession.createDataFrame(javaRddRowA, schemaA);
 
                String dmlString = "[tA, tAM] = transformencode (target = A, 
spec = \"{ids: false ,recode: [ featureName, id ]}\");";
 
@@ -622,7 +621,7 @@ public class MLContextFrameTest extends AutomatedTestBase {
        // JavaRDD<Row> javaRddRowA = javaRddStringA.map(new
        // CommaSeparatedValueStringToRow());
        //
-       // SQLContext sqlContext = new SQLContext(sc);
+       // SparkSession sparkSession = 
SparkSession.builder().sparkContext(sc.sc()).getOrCreate();
        //
        // List<StructField> fieldsA = new ArrayList<StructField>();
        // fieldsA.add(DataTypes.createStructField("1", DataTypes.StringType,
@@ -630,7 +629,7 @@ public class MLContextFrameTest extends AutomatedTestBase {
        // fieldsA.add(DataTypes.createStructField("2", DataTypes.StringType,
        // true));
        // StructType schemaA = DataTypes.createStructType(fieldsA);
-       // DataFrame dataFrameA = sqlContext.createDataFrame(javaRddRowA, 
schemaA);
+       // DataFrame dataFrameA = sparkSession.createDataFrame(javaRddRowA, 
schemaA);
        //
        // String dmlString = "[tA, tAM] = transformencode (target = A, spec =
        // \"{ids: true ,recode: [ 1, 2 ]}\");\n";


Reply via email to