Repository: spark
Updated Branches:
  refs/heads/branch-2.0 2cb84dd23 -> a355edeef


[SPARK-15603][MLLIB] Replace SQLContext with SparkSession in ML/MLLib

## What changes were proposed in this pull request?

This PR replaces all deprecated `SQLContext` occurrences with `SparkSession` in 
`ML/MLLib` module except the following two classes. These two classes use 
`SQLContext` in their function signatures.
- ReadWrite.scala
- TreeModels.scala

## How was this patch tested?

Pass the existing Jenkins tests.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #13352 from dongjoon-hyun/SPARK-15603.

(cherry picked from commit d24e251572d39a453293cabfe14e4aed25a55208)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a355edee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a355edee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a355edee

Branch: refs/heads/branch-2.0
Commit: a355edeefa16988da8b05d2539a91277e75e823c
Parents: 2cb84dd
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Fri May 27 11:09:15 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Fri May 27 11:09:33 2016 -0700

----------------------------------------------------------------------
 .../spark/ml/clustering/GaussianMixture.scala   |  7 ++--
 .../spark/ml/feature/SQLTransformer.scala       | 14 ++++----
 .../org/apache/spark/ml/feature/Word2Vec.scala  | 16 ++++-----
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 23 +++++++------
 .../classification/LogisticRegression.scala     | 19 +++++------
 .../spark/mllib/classification/NaiveBayes.scala | 24 ++++++-------
 .../impl/GLMClassificationModel.scala           | 18 +++++-----
 .../mllib/clustering/BisectingKMeansModel.scala | 12 +++----
 .../mllib/clustering/GaussianMixtureModel.scala | 16 ++++-----
 .../spark/mllib/clustering/KMeansModel.scala    | 13 ++++---
 .../spark/mllib/clustering/LDAModel.scala       | 36 +++++++++-----------
 .../clustering/PowerIterationClustering.scala   | 12 +++----
 .../spark/mllib/feature/ChiSqSelector.scala     | 13 ++++---
 .../apache/spark/mllib/feature/Word2Vec.scala   | 13 +++----
 .../org/apache/spark/mllib/fpm/FPGrowth.scala   | 10 +++---
 .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 10 +++---
 .../MatrixFactorizationModel.scala              | 12 +++----
 .../mllib/regression/IsotonicRegression.scala   | 12 +++----
 .../regression/impl/GLMRegressionModel.scala    | 18 +++++-----
 .../mllib/tree/model/DecisionTreeModel.scala    | 20 +++++------
 .../mllib/tree/model/treeEnsembleModels.scala   | 17 +++++----
 .../ml/feature/QuantileDiscretizerSuite.scala   | 14 ++++----
 .../mllib/util/MLlibTestSparkContext.scala      |  6 ++--
 23 files changed, 160 insertions(+), 195 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index 88b6b27..773e50e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -20,7 +20,6 @@ package org.apache.spark.ml.clustering
 import breeze.linalg.{DenseVector => BDV}
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model}
 import org.apache.spark.ml.impl.Utils.EPSILON
@@ -33,7 +32,7 @@ import org.apache.spark.mllib.clustering.{GaussianMixture => 
MLlibGM}
 import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => 
OldMatrix,
   Vector => OldVector, Vectors => OldVectors, VectorUDT => OldVectorUDT}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions.{col, udf}
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
@@ -134,9 +133,7 @@ class GaussianMixtureModel private[ml] (
     val modelGaussians = gaussians.map { gaussian =>
       (OldVectors.fromML(gaussian.mean), OldMatrices.fromML(gaussian.cov))
     }
-    val sc = SparkContext.getOrCreate()
-    val sqlContext = SQLContext.getOrCreate(sc)
-    sqlContext.createDataFrame(modelGaussians).toDF("mean", "cov")
+    
SparkSession.builder().getOrCreate().createDataFrame(modelGaussians).toDF("mean",
 "cov")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala
index 2d4cac6..bd8f949 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/SQLTransformer.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.param.{Param, ParamMap}
 import org.apache.spark.ml.Transformer
 import org.apache.spark.ml.util._
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -74,15 +73,14 @@ class SQLTransformer @Since("1.6.0") (override val uid: 
String) extends Transfor
 
   @Since("1.6.0")
   override def transformSchema(schema: StructType): StructType = {
-    val sc = SparkContext.getOrCreate()
-    val sqlContext = SQLContext.getOrCreate(sc)
-    val dummyRDD = sc.parallelize(Seq(Row.empty))
-    val dummyDF = sqlContext.createDataFrame(dummyRDD, schema)
+    val spark = SparkSession.builder().getOrCreate()
+    val dummyRDD = spark.sparkContext.parallelize(Seq(Row.empty))
+    val dummyDF = spark.createDataFrame(dummyRDD, schema)
     val tableName = Identifiable.randomUID(uid)
     val realStatement = $(statement).replace(tableIdentifier, tableName)
     dummyDF.createOrReplaceTempView(tableName)
-    val outputSchema = sqlContext.sql(realStatement).schema
-    sqlContext.dropTempTable(tableName)
+    val outputSchema = spark.sql(realStatement).schema
+    spark.catalog.dropTempView(tableName)
     outputSchema
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index 1469bfd..1b929cd 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -28,7 +28,7 @@ import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.mllib.feature
 import org.apache.spark.mllib.linalg.VectorImplicits._
-import org.apache.spark.sql.{DataFrame, Dataset, SQLContext}
+import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
@@ -183,11 +183,9 @@ class Word2VecModel private[ml] (
    * and the vector the DenseVector that it is mapped to.
    */
   @transient lazy val getVectors: DataFrame = {
-    val sc = SparkContext.getOrCreate()
-    val sqlContext = SQLContext.getOrCreate(sc)
-    import sqlContext.implicits._
+    val spark = SparkSession.builder().getOrCreate()
     val wordVec = wordVectors.getVectors.mapValues(vec => 
Vectors.dense(vec.map(_.toDouble)))
-    sc.parallelize(wordVec.toSeq).toDF("word", "vector")
+    spark.createDataFrame(wordVec.toSeq).toDF("word", "vector")
   }
 
   /**
@@ -205,10 +203,8 @@ class Word2VecModel private[ml] (
    * synonyms and the given word vector.
    */
   def findSynonyms(word: Vector, num: Int): DataFrame = {
-    val sc = SparkContext.getOrCreate()
-    val sqlContext = SQLContext.getOrCreate(sc)
-    import sqlContext.implicits._
-    sc.parallelize(wordVectors.findSynonyms(word, num)).toDF("word", 
"similarity")
+    val spark = SparkSession.builder().getOrCreate()
+    spark.createDataFrame(wordVectors.findSynonyms(word, num)).toDF("word", 
"similarity")
   }
 
   /** @group setParam */
@@ -230,7 +226,7 @@ class Word2VecModel private[ml] (
     val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)
     val d = $(vectorSize)
     val word2Vec = udf { sentence: Seq[String] =>
-      if (sentence.size == 0) {
+      if (sentence.isEmpty) {
         Vectors.sparse(d, Array.empty[Int], Array.empty[Double])
       } else {
         val sum = Vectors.zeros(d)

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 667290e..6e0ed37 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -30,8 +30,7 @@ import net.razorvine.pickle._
 
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.api.python.SerDeUtil
-import org.apache.spark.ml.feature.{LabeledPoint => MLLabeledPoint}
-import org.apache.spark.ml.linalg.{DenseMatrix => NewDenseMatrix, DenseVector 
=> NewDenseVector, SparseMatrix => NewSparseMatrix, SparseVector => 
NewSparseVector, Vector => NewVector, Vectors => NewVectors}
+import org.apache.spark.ml.linalg.{DenseMatrix => NewDenseMatrix, DenseVector 
=> NewDenseVector, SparseMatrix => NewSparseMatrix, SparseVector => 
NewSparseVector, Vectors => NewVectors}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
 import org.apache.spark.mllib.evaluation.RankingMetrics
@@ -43,8 +42,7 @@ import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.random.{RandomRDDs => RG}
 import org.apache.spark.mllib.recommendation._
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.stat.{
-  KernelDensity, MultivariateStatisticalSummary, Statistics}
+import org.apache.spark.mllib.stat.{KernelDensity, 
MultivariateStatisticalSummary, Statistics}
 import org.apache.spark.mllib.stat.correlation.CorrelationNames
 import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
 import org.apache.spark.mllib.stat.test.{ChiSqTestResult, 
KolmogorovSmirnovTestResult}
@@ -56,7 +54,7 @@ import org.apache.spark.mllib.tree.model.{DecisionTreeModel, 
GradientBoostedTree
   RandomForestModel}
 import org.apache.spark.mllib.util.{LinearDataGenerator, MLUtils}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
 
@@ -1178,8 +1176,9 @@ private[python] class PythonMLLibAPI extends Serializable 
{
   def getIndexedRows(indexedRowMatrix: IndexedRowMatrix): DataFrame = {
     // We use DataFrames for serialization of IndexedRows to Python,
     // so return a DataFrame.
-    val sqlContext = SQLContext.getOrCreate(indexedRowMatrix.rows.sparkContext)
-    sqlContext.createDataFrame(indexedRowMatrix.rows)
+    val sc = indexedRowMatrix.rows.sparkContext
+    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+    spark.createDataFrame(indexedRowMatrix.rows)
   }
 
   /**
@@ -1188,8 +1187,9 @@ private[python] class PythonMLLibAPI extends Serializable 
{
   def getMatrixEntries(coordinateMatrix: CoordinateMatrix): DataFrame = {
     // We use DataFrames for serialization of MatrixEntry entries to
     // Python, so return a DataFrame.
-    val sqlContext = 
SQLContext.getOrCreate(coordinateMatrix.entries.sparkContext)
-    sqlContext.createDataFrame(coordinateMatrix.entries)
+    val sc = coordinateMatrix.entries.sparkContext
+    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+    spark.createDataFrame(coordinateMatrix.entries)
   }
 
   /**
@@ -1198,8 +1198,9 @@ private[python] class PythonMLLibAPI extends Serializable 
{
   def getMatrixBlocks(blockMatrix: BlockMatrix): DataFrame = {
     // We use DataFrames for serialization of sub-matrix blocks to
     // Python, so return a DataFrame.
-    val sqlContext = SQLContext.getOrCreate(blockMatrix.blocks.sparkContext)
-    sqlContext.createDataFrame(blockMatrix.blocks)
+    val sc = blockMatrix.blocks.sparkContext
+    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+    spark.createDataFrame(blockMatrix.blocks)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index 4bba2ea..b186ca3 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -28,7 +28,7 @@ import org.apache.spark.mllib.pmml.PMMLExportable
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.{DataValidators, Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.storage.StorageLevel
 
 /**
@@ -422,7 +422,7 @@ class LogisticRegressionWithLBFGS
       LogisticRegressionModel = {
     // ml's Logistic regression only supports binary classification currently.
     if (numOfLinearPredictor == 1) {
-      def runWithMlLogisitcRegression(elasticNetParam: Double) = {
+      def runWithMlLogisticRegression(elasticNetParam: Double) = {
         // Prepare the ml LogisticRegression based on our settings
         val lr = new org.apache.spark.ml.classification.LogisticRegression()
         lr.setRegParam(optimizer.getRegParam())
@@ -437,20 +437,19 @@ class LogisticRegressionWithLBFGS
         lr.setMaxIter(optimizer.getNumIterations())
         lr.setTol(optimizer.getConvergenceTol())
         // Convert our input into a DataFrame
-        val sqlContext = SQLContext.getOrCreate(input.context)
-        import sqlContext.implicits._
-        val df = input.map(_.asML).toDF()
+        val spark = 
SparkSession.builder().config(input.context.getConf).getOrCreate()
+        val df = spark.createDataFrame(input.map(_.asML))
         // Determine if we should cache the DF
         val handlePersistence = input.getStorageLevel == StorageLevel.NONE
         // Train our model
-        val mlLogisticRegresionModel = lr.train(df, handlePersistence)
+        val mlLogisticRegressionModel = lr.train(df, handlePersistence)
         // convert the model
-        val weights = 
Vectors.dense(mlLogisticRegresionModel.coefficients.toArray)
-        createModel(weights, mlLogisticRegresionModel.intercept)
+        val weights = 
Vectors.dense(mlLogisticRegressionModel.coefficients.toArray)
+        createModel(weights, mlLogisticRegressionModel.intercept)
       }
       optimizer.getUpdater() match {
-        case x: SquaredL2Updater => runWithMlLogisitcRegression(0.0)
-        case x: L1Updater => runWithMlLogisitcRegression(1.0)
+        case x: SquaredL2Updater => runWithMlLogisticRegression(0.0)
+        case x: L1Updater => runWithMlLogisticRegression(1.0)
         case _ => super.run(input, initialWeights)
       }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index eb3ee41..452802f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -31,7 +31,7 @@ import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, 
DenseVector, SparseVect
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.SparkSession
 
 /**
  * Model for Naive Bayes Classifiers.
@@ -193,8 +193,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
         modelType: String)
 
     def save(sc: SparkContext, path: String, data: Data): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       // Create JSON metadata.
       val metadata = compact(render(
@@ -203,15 +202,14 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
 
       // Create Parquet data.
-      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
-      dataRDD.write.parquet(dataPath(path))
+      
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
     }
 
     @Since("1.3.0")
     def load(sc: SparkContext, path: String): NaiveBayesModel = {
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       // Load Parquet data.
-      val dataRDD = sqlContext.read.parquet(dataPath(path))
+      val dataRDD = spark.read.parquet(dataPath(path))
       // Check schema explicitly since erasure makes it hard to use match-case 
for checking.
       checkSchema[Data](dataRDD.schema)
       val dataArray = dataRDD.select("labels", "pi", "theta", 
"modelType").take(1)
@@ -240,8 +238,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
         theta: Array[Array[Double]])
 
     def save(sc: SparkContext, path: String, data: Data): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       // Create JSON metadata.
       val metadata = compact(render(
@@ -250,14 +247,13 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
 
       // Create Parquet data.
-      val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
-      dataRDD.write.parquet(dataPath(path))
+      
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): NaiveBayesModel = {
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       // Load Parquet data.
-      val dataRDD = sqlContext.read.parquet(dataPath(path))
+      val dataRDD = spark.read.parquet(dataPath(path))
       // Check schema explicitly since erasure makes it hard to use match-case 
for checking.
       checkSchema[Data](dataRDD.schema)
       val dataArray = dataRDD.select("labels", "pi", "theta").take(1)
@@ -327,7 +323,7 @@ class NaiveBayes private (
   @Since("0.9.0")
   def setLambda(lambda: Double): NaiveBayes = {
     require(lambda >= 0,
-      s"Smoothing parameter must be nonnegative but got ${lambda}")
+      s"Smoothing parameter must be nonnegative but got $lambda")
     this.lambda = lambda
     this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
index 4308ae0..32e323d 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala
@@ -23,7 +23,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.util.Loader
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 
 /**
  * Helper class for import/export of GLM classification models.
@@ -51,8 +51,7 @@ private[classification] object GLMClassificationModel {
         weights: Vector,
         intercept: Double,
         threshold: Option[Double]): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       // Create JSON metadata.
       val metadata = compact(render(
@@ -62,7 +61,7 @@ private[classification] object GLMClassificationModel {
 
       // Create Parquet data.
       val data = Data(weights, intercept, threshold)
-      sc.parallelize(Seq(data), 1).toDF().write.parquet(Loader.dataPath(path))
+      
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path))
     }
 
     /**
@@ -73,13 +72,13 @@ private[classification] object GLMClassificationModel {
      * @param modelClass  String name for model class (used for error messages)
      */
     def loadData(sc: SparkContext, path: String, modelClass: String): Data = {
-      val datapath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataRDD = sqlContext.read.parquet(datapath)
+      val dataPath = Loader.dataPath(path)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataRDD = spark.read.parquet(dataPath)
       val dataArray = dataRDD.select("weights", "intercept", 
"threshold").take(1)
-      assert(dataArray.length == 1, s"Unable to load $modelClass data from: 
$datapath")
+      assert(dataArray.length == 1, s"Unable to load $modelClass data from: 
$dataPath")
       val data = dataArray(0)
-      assert(data.size == 3, s"Unable to load $modelClass data from: 
$datapath")
+      assert(data.size == 3, s"Unable to load $modelClass data from: 
$dataPath")
       val (weights, intercept) = data match {
         case Row(weights: Vector, intercept: Double, _) =>
           (weights, intercept)
@@ -92,5 +91,4 @@ private[classification] object GLMClassificationModel {
       Data(weights, intercept, threshold)
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
index c3b5b8b..510a91b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 
 /**
  * Clustering model produced by [[BisectingKMeans]].
@@ -144,8 +144,7 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
     val thisClassName = 
"org.apache.spark.mllib.clustering.BisectingKMeansModel"
 
     def save(sc: SparkContext, model: BisectingKMeansModel, path: String): 
Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
           ~ ("rootId" -> model.root.index)))
@@ -154,8 +153,7 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
       val data = getNodes(model.root).map(node => Data(node.index, node.size,
         node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, 
node.height,
         node.children.map(_.index)))
-      val dataRDD = sc.parallelize(data).toDF()
-      dataRDD.write.parquet(Loader.dataPath(path))
+      spark.createDataFrame(data).write.parquet(Loader.dataPath(path))
     }
 
     private def getNodes(node: ClusteringTreeNode): Array[ClusteringTreeNode] 
= {
@@ -167,8 +165,8 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
     }
 
     def load(sc: SparkContext, path: String, rootId: Int): 
BisectingKMeansModel = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val rows = sqlContext.read.parquet(Loader.dataPath(path))
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val rows = spark.read.parquet(Loader.dataPath(path))
       Loader.checkSchema[Data](rows.schema)
       val data = rows.select("index", "size", "center", "norm", "cost", 
"height", "children")
       val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, 
d)).toMap

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
index f87613c..4b06816 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala
@@ -29,7 +29,7 @@ import org.apache.spark.mllib.linalg.{Matrix, Vector}
 import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
 import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 
 /**
  * Multivariate Gaussian Mixture Model (GMM) consisting of k Gaussians, where 
points
@@ -143,9 +143,7 @@ object GaussianMixtureModel extends 
Loader[GaussianMixtureModel] {
         path: String,
         weights: Array[Double],
         gaussians: Array[MultivariateGaussian]): Unit = {
-
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       // Create JSON metadata.
       val metadata = compact(render
@@ -156,13 +154,13 @@ object GaussianMixtureModel extends 
Loader[GaussianMixtureModel] {
       val dataArray = Array.tabulate(weights.length) { i =>
         Data(weights(i), gaussians(i).mu, gaussians(i).sigma)
       }
-      sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path))
+      
spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): GaussianMixtureModel = {
       val dataPath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataFrame = sqlContext.read.parquet(dataPath)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataFrame = spark.read.parquet(dataPath)
       // Check schema explicitly since erasure makes it hard to use match-case 
for checking.
       Loader.checkSchema[Data](dataFrame.schema)
       val dataArray = dataFrame.select("weight", "mu", "sigma").collect()
@@ -172,7 +170,7 @@ object GaussianMixtureModel extends 
Loader[GaussianMixtureModel] {
           (weight, new MultivariateGaussian(mu, sigma))
       }.unzip
 
-      new GaussianMixtureModel(weights.toArray, gaussians.toArray)
+      new GaussianMixtureModel(weights, gaussians)
     }
   }
 
@@ -189,7 +187,7 @@ object GaussianMixtureModel extends 
Loader[GaussianMixtureModel] {
           s"GaussianMixtureModel requires weights of length $k " +
           s"got weights of length ${model.weights.length}")
         require(model.gaussians.length == k,
-          s"GaussianMixtureModel requires gaussians of length $k" +
+          s"GaussianMixtureModel requires gaussians of length $k " +
           s"got gaussians of length ${model.gaussians.length}")
         model
       case _ => throw new Exception(

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 439e4f8..5f939c1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -30,7 +30,7 @@ import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.pmml.PMMLExportable
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 
 /**
  * A clustering model for K-means. Each point belongs to the cluster with the 
closest center.
@@ -123,25 +123,24 @@ object KMeansModel extends Loader[KMeansModel] {
     val thisClassName = "org.apache.spark.mllib.clustering.KMeansModel"
 
     def save(sc: SparkContext, model: KMeansModel, path: String): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" 
-> model.k)))
       sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
       val dataRDD = sc.parallelize(model.clusterCenters.zipWithIndex).map { 
case (point, id) =>
         Cluster(id, point)
-      }.toDF()
-      dataRDD.write.parquet(Loader.dataPath(path))
+      }
+      spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): KMeansModel = {
       implicit val formats = DefaultFormats
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
       val k = (metadata \ "k").extract[Int]
-      val centroids = sqlContext.read.parquet(Loader.dataPath(path))
+      val centroids = spark.read.parquet(Loader.dataPath(path))
       Loader.checkSchema[Cluster](centroids.schema)
       val localCentroids = centroids.rdd.map(Cluster.apply).collect()
       assert(k == localCentroids.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 4913c02..0a515f8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -31,7 +31,7 @@ import org.apache.spark.graphx.{Edge, EdgeContext, Graph, 
VertexId}
 import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.util.BoundedPriorityQueue
 
 /**
@@ -446,9 +446,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
         docConcentration: Vector,
         topicConcentration: Double,
         gammaShape: Double): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
-
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       val k = topicsMatrix.numCols
       val metadata = compact(render
         (("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
@@ -461,8 +459,8 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
       val topicsDenseMatrix = topicsMatrix.toBreeze.toDenseMatrix
       val topics = Range(0, k).map { topicInd =>
         Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), 
topicInd)
-      }.toSeq
-      sc.parallelize(topics, 1).toDF().write.parquet(Loader.dataPath(path))
+      }
+      
spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))
     }
 
     def load(
@@ -472,8 +470,8 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
         topicConcentration: Double,
         gammaShape: Double): LocalLDAModel = {
       val dataPath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataFrame = sqlContext.read.parquet(dataPath)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataFrame = spark.read.parquet(dataPath)
 
       Loader.checkSchema[Data](dataFrame.schema)
       val topics = dataFrame.collect()
@@ -853,8 +851,7 @@ object DistributedLDAModel extends 
Loader[DistributedLDAModel] {
         topicConcentration: Double,
         iterationTimes: Array[Double],
         gammaShape: Double): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val metadata = compact(render
         (("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
@@ -866,18 +863,17 @@ object DistributedLDAModel extends 
Loader[DistributedLDAModel] {
       sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
 
       val newPath = new Path(Loader.dataPath(path), 
"globalTopicTotals").toUri.toString
-      sc.parallelize(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).toDF()
-        .write.parquet(newPath)
+      
spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath)
 
       val verticesPath = new Path(Loader.dataPath(path), 
"topicCounts").toUri.toString
-      graph.vertices.map { case (ind, vertex) =>
+      spark.createDataFrame(graph.vertices.map { case (ind, vertex) =>
         VertexData(ind, Vectors.fromBreeze(vertex))
-      }.toDF().write.parquet(verticesPath)
+      }).write.parquet(verticesPath)
 
       val edgesPath = new Path(Loader.dataPath(path), 
"tokenCounts").toUri.toString
-      graph.edges.map { case Edge(srcId, dstId, prop) =>
+      spark.createDataFrame(graph.edges.map { case Edge(srcId, dstId, prop) =>
         EdgeData(srcId, dstId, prop)
-      }.toDF().write.parquet(edgesPath)
+      }).write.parquet(edgesPath)
     }
 
     def load(
@@ -891,10 +887,10 @@ object DistributedLDAModel extends 
Loader[DistributedLDAModel] {
       val dataPath = new Path(Loader.dataPath(path), 
"globalTopicTotals").toUri.toString
       val vertexDataPath = new Path(Loader.dataPath(path), 
"topicCounts").toUri.toString
       val edgeDataPath = new Path(Loader.dataPath(path), 
"tokenCounts").toUri.toString
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataFrame = sqlContext.read.parquet(dataPath)
-      val vertexDataFrame = sqlContext.read.parquet(vertexDataPath)
-      val edgeDataFrame = sqlContext.read.parquet(edgeDataPath)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataFrame = spark.read.parquet(dataPath)
+      val vertexDataFrame = spark.read.parquet(vertexDataPath)
+      val edgeDataFrame = spark.read.parquet(edgeDataPath)
 
       Loader.checkSchema[Data](dataFrame.schema)
       Loader.checkSchema[VertexData](vertexDataFrame.schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 2e257ff..51077bd 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.util.random.XORShiftRandom
 
 /**
@@ -70,28 +70,26 @@ object PowerIterationClusteringModel extends 
Loader[PowerIterationClusteringMode
 
     @Since("1.4.0")
     def save(sc: SparkContext, model: PowerIterationClusteringModel, path: 
String): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" 
-> model.k)))
       sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
 
-      val dataRDD = model.assignments.toDF()
-      dataRDD.write.parquet(Loader.dataPath(path))
+      
spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path))
     }
 
     @Since("1.4.0")
     def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
       implicit val formats = DefaultFormats
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
 
       val k = (metadata \ "k").extract[Int]
-      val assignments = sqlContext.read.parquet(Loader.dataPath(path))
+      val assignments = spark.read.parquet(Loader.dataPath(path))
       
Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
 
       val assignmentsRDD = assignments.rdd.map {

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
index 4f0e13f..13decef 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala
@@ -30,7 +30,7 @@ import org.apache.spark.mllib.stat.Statistics
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 
 /**
  * Chi Squared selector model.
@@ -134,8 +134,8 @@ object ChiSqSelectorModel extends 
Loader[ChiSqSelectorModel] {
     val thisClassName = "org.apache.spark.mllib.feature.ChiSqSelectorModel"
 
     def save(sc: SparkContext, model: ChiSqSelectorModel, path: String): Unit 
= {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
       sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
@@ -144,18 +144,17 @@ object ChiSqSelectorModel extends 
Loader[ChiSqSelectorModel] {
       val dataArray = Array.tabulate(model.selectedFeatures.length) { i =>
         Data(model.selectedFeatures(i))
       }
-      sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path))
-
+      
spark.createDataFrame(dataArray).repartition(1).write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): ChiSqSelectorModel = {
       implicit val formats = DefaultFormats
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
 
-      val dataFrame = sqlContext.read.parquet(Loader.dataPath(path))
+      val dataFrame = spark.read.parquet(Loader.dataPath(path))
       val dataArray = dataFrame.select("feature")
 
       // Check schema explicitly since erasure makes it hard to use match-case 
for checking.

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 7e6c367..9bd79aa 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd._
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.Utils
 import org.apache.spark.util.random.XORShiftRandom
 
@@ -609,9 +609,8 @@ object Word2VecModel extends Loader[Word2VecModel] {
     case class Data(word: String, vector: Array[Float])
 
     def load(sc: SparkContext, path: String): Word2VecModel = {
-      val dataPath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataFrame = sqlContext.read.parquet(dataPath)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataFrame = spark.read.parquet(Loader.dataPath(path))
       // Check schema explicitly since erasure makes it hard to use match-case 
for checking.
       Loader.checkSchema[Data](dataFrame.schema)
 
@@ -621,9 +620,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
     }
 
     def save(sc: SparkContext, path: String, model: Map[String, 
Array[Float]]): Unit = {
-
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val vectorSize = model.values.head.length
       val numWords = model.size
@@ -641,7 +638,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
       val approxSize = 4L * numWords * vectorSize
       val nPartitions = ((approxSize / partitionSize) + 1).toInt
       val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
-      sc.parallelize(dataArray.toSeq, 
nPartitions).toDF().write.parquet(Loader.dataPath(path))
+      
spark.createDataFrame(dataArray).repartition(nPartitions).write.parquet(Loader.dataPath(path))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
index 28e4966..8c0639b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
@@ -37,7 +37,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.fpm.FPGrowth._
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
@@ -99,7 +99,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
 
     def save(model: FPGrowthModel[_], path: String): Unit = {
       val sc = model.freqItemsets.sparkContext
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
@@ -118,18 +118,18 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
       val rowDataRDD = model.freqItemsets.map { x =>
         Row(x.items.toSeq, x.freq)
       }
-      sqlContext.createDataFrame(rowDataRDD, 
schema).write.parquet(Loader.dataPath(path))
+      spark.createDataFrame(rowDataRDD, 
schema).write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): FPGrowthModel[_] = {
       implicit val formats = DefaultFormats
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
 
-      val freqItemsets = sqlContext.read.parquet(Loader.dataPath(path))
+      val freqItemsets = spark.read.parquet(Loader.dataPath(path))
       val sample = freqItemsets.select("items").head().get(0)
       loadImpl(freqItemsets, sample)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 4344ab1..10bbcd2 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -36,7 +36,7 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
 import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
@@ -616,7 +616,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {
 
     def save(model: PrefixSpanModel[_], path: String): Unit = {
       val sc = model.freqSequences.sparkContext
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
@@ -635,18 +635,18 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] 
{
       val rowDataRDD = model.freqSequences.map { x =>
         Row(x.sequence, x.freq)
       }
-      sqlContext.createDataFrame(rowDataRDD, 
schema).write.parquet(Loader.dataPath(path))
+      spark.createDataFrame(rowDataRDD, 
schema).write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): PrefixSpanModel[_] = {
       implicit val formats = DefaultFormats
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
 
-      val freqSequences = sqlContext.read.parquet(Loader.dataPath(path))
+      val freqSequences = spark.read.parquet(Loader.dataPath(path))
       val sample = freqSequences.select("sequence").head().get(0)
       loadImpl(freqSequences, sample)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 6f780b0..450025f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -37,7 +37,7 @@ import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.storage.StorageLevel
 
 /**
@@ -354,8 +354,8 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
      */
     def save(model: MatrixFactorizationModel, path: String): Unit = {
       val sc = model.userFeatures.sparkContext
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      import spark.implicits._
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ 
("rank" -> model.rank)))
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
@@ -365,16 +365,16 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
 
     def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
       implicit val formats = DefaultFormats
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
       val (className, formatVersion, metadata) = loadMetadata(sc, path)
       assert(className == thisClassName)
       assert(formatVersion == thisFormatVersion)
       val rank = (metadata \ "rank").extract[Int]
-      val userFeatures = sqlContext.read.parquet(userPath(path)).rdd.map {
+      val userFeatures = spark.read.parquet(userPath(path)).rdd.map {
         case Row(id: Int, features: Seq[_]) =>
           (id, features.asInstanceOf[Seq[Double]].toArray)
       }
-      val productFeatures = sqlContext.read.parquet(productPath(path)).rdd.map 
{
+      val productFeatures = spark.read.parquet(productPath(path)).rdd.map {
         case Row(id: Int, features: Seq[_]) =>
           (id, features.asInstanceOf[Seq[Double]].toArray)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
index abdd798..215a799 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala
@@ -34,7 +34,7 @@ import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 
 /**
  * Regression model for isotonic regression.
@@ -185,21 +185,21 @@ object IsotonicRegressionModel extends 
Loader[IsotonicRegressionModel] {
         boundaries: Array[Double],
         predictions: Array[Double],
         isotonic: Boolean): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       val metadata = compact(render(
         ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
           ("isotonic" -> isotonic)))
       sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
 
-      sqlContext.createDataFrame(
+      spark.createDataFrame(
         boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) }
       ).write.parquet(dataPath(path))
     }
 
     def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = 
{
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataRDD = sqlContext.read.parquet(dataPath(path))
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataRDD = spark.read.parquet(dataPath(path))
 
       checkSchema[Data](dataRDD.schema)
       val dataArray = dataRDD.select("boundary", "prediction").collect()
@@ -221,7 +221,7 @@ object IsotonicRegressionModel extends 
Loader[IsotonicRegressionModel] {
         val (boundaries, predictions) = SaveLoadV1_0.load(sc, path)
         new IsotonicRegressionModel(boundaries, predictions, isotonic)
       case _ => throw new Exception(
-        s"IsotonicRegressionModel.load did not recognize model with 
(className, format version):" +
+        s"IsotonicRegressionModel.load did not recognize model with 
(className, format version): " +
         s"($loadedClassName, $version).  Supported:\n" +
         s"  ($classNameV1_0, 1.0)"
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
index 7696fdf..3c7bbc5 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala
@@ -23,7 +23,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.util.Loader
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 
 /**
  * Helper methods for import/export of GLM regression models.
@@ -47,7 +47,7 @@ private[regression] object GLMRegressionModel {
         modelClass: String,
         weights: Vector,
         intercept: Double): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       // Create JSON metadata.
       val metadata = compact(render(
@@ -57,7 +57,7 @@ private[regression] object GLMRegressionModel {
 
       // Create Parquet data.
       val data = Data(weights, intercept)
-      
sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path))
+      
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path))
     }
 
     /**
@@ -67,17 +67,17 @@ private[regression] object GLMRegressionModel {
      *                     The length of the weights vector should equal 
numFeatures.
      */
     def loadData(sc: SparkContext, path: String, modelClass: String, 
numFeatures: Int): Data = {
-      val datapath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val dataRDD = sqlContext.read.parquet(datapath)
+      val dataPath = Loader.dataPath(path)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataRDD = spark.read.parquet(dataPath)
       val dataArray = dataRDD.select("weights", "intercept").take(1)
-      assert(dataArray.length == 1, s"Unable to load $modelClass data from: 
$datapath")
+      assert(dataArray.length == 1, s"Unable to load $modelClass data from: 
$dataPath")
       val data = dataArray(0)
-      assert(data.size == 2, s"Unable to load $modelClass data from: 
$datapath")
+      assert(data.size == 2, s"Unable to load $modelClass data from: 
$dataPath")
       data match {
         case Row(weights: Vector, intercept: Double) =>
           assert(weights.size == numFeatures, s"Expected $numFeatures 
features, but" +
-            s" found ${weights.size} features when loading $modelClass weights 
from $datapath")
+            s" found ${weights.size} features when loading $modelClass weights 
from $dataPath")
           Data(weights, intercept)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index c13b9a6..7266318 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -32,7 +32,7 @@ import org.apache.spark.mllib.tree.configuration.{Algo, 
FeatureType}
 import org.apache.spark.mllib.tree.configuration.Algo._
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.util.Utils
 
 /**
@@ -202,9 +202,6 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] 
with Logging {
     }
 
     def save(sc: SparkContext, path: String, model: DecisionTreeModel): Unit = 
{
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
-
       // SPARK-6120: We do a hacky check here so users understand why save() 
is failing
       //             when they run the ML guide example.
       // TODO: Fix this issue for real.
@@ -235,17 +232,16 @@ object DecisionTreeModel extends 
Loader[DecisionTreeModel] with Logging {
 
       // Create Parquet data.
       val nodes = model.topNode.subtreeIterator.toSeq
-      val dataRDD: DataFrame = sc.parallelize(nodes)
-        .map(NodeData.apply(0, _))
-        .toDF()
-      dataRDD.write.parquet(Loader.dataPath(path))
+      val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _))
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path))
     }
 
     def load(sc: SparkContext, path: String, algo: String, numNodes: Int): 
DecisionTreeModel = {
-      val datapath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
       // Load Parquet data.
-      val dataRDD = sqlContext.read.parquet(datapath)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      val dataPath = Loader.dataPath(path)
+      val dataRDD = spark.read.parquet(dataPath)
       // Check schema explicitly since erasure makes it hard to use match-case 
for checking.
       Loader.checkSchema[NodeData](dataRDD.schema)
       val nodes = dataRDD.rdd.map(NodeData.apply)
@@ -254,7 +250,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] 
with Logging {
       assert(trees.length == 1,
         "Decision tree should contain exactly one tree but got ${trees.size} 
trees.")
       val model = new DecisionTreeModel(trees(0), Algo.fromString(algo))
-      assert(model.numNodes == numNodes, s"Unable to load DecisionTreeModel 
data from: $datapath." +
+      assert(model.numNodes == numNodes, s"Unable to load DecisionTreeModel 
data from: $dataPath." +
         s" Expected $numNodes nodes but found ${model.numNodes}")
       model
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
index cbf49b6..c653b98 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.mllib.tree.configuration.EnsembleCombiningStrategy._
 import org.apache.spark.mllib.tree.loss.Loss
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.Utils
 
 /**
@@ -413,8 +413,7 @@ private[tree] object TreeEnsembleModel extends Logging {
     case class EnsembleNodeData(treeId: Int, node: NodeData)
 
     def save(sc: SparkContext, path: String, model: TreeEnsembleModel, 
className: String): Unit = {
-      val sqlContext = SQLContext.getOrCreate(sc)
-      import sqlContext.implicits._
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
 
       // SPARK-6120: We do a hacky check here so users understand why save() 
is failing
       //             when they run the ML guide example.
@@ -450,8 +449,8 @@ private[tree] object TreeEnsembleModel extends Logging {
       // Create Parquet data.
       val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case 
(tree, treeId) =>
         tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
-      }.toDF()
-      dataRDD.write.parquet(Loader.dataPath(path))
+      }
+      spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path))
     }
 
     /**
@@ -472,10 +471,10 @@ private[tree] object TreeEnsembleModel extends Logging {
         sc: SparkContext,
         path: String,
         treeAlgo: String): Array[DecisionTreeModel] = {
-      val datapath = Loader.dataPath(path)
-      val sqlContext = SQLContext.getOrCreate(sc)
-      val nodes = sqlContext.read.parquet(datapath).rdd.map(NodeData.apply)
-      val trees = constructTrees(nodes)
+      val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+      import spark.implicits._
+      val nodes = spark.read.parquet(Loader.dataPath(path)).map(NodeData.apply)
+      val trees = constructTrees(nodes.rdd)
       trees.map(new DecisionTreeModel(_, Algo.fromString(treeAlgo)))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index 8895d63..621c13a 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -20,15 +20,15 @@ package org.apache.spark.ml.feature
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.util.DefaultReadWriteTest
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.udf
 
 class QuantileDiscretizerSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
   test("Test observed number of buckets and their sizes match expected 
values") {
-    val sqlCtx = SQLContext.getOrCreate(sc)
-    import sqlCtx.implicits._
+    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+    import spark.implicits._
 
     val datasetSize = 100000
     val numBuckets = 5
@@ -53,8 +53,8 @@ class QuantileDiscretizerSuite
   }
 
   test("Test transform method on unseen data") {
-    val sqlCtx = SQLContext.getOrCreate(sc)
-    import sqlCtx.implicits._
+    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+    import spark.implicits._
 
     val trainDF = sc.parallelize(1.0 to 100.0 by 
1.0).map(Tuple1.apply).toDF("input")
     val testDF = sc.parallelize(-10.0 to 110.0 by 
1.0).map(Tuple1.apply).toDF("input")
@@ -82,8 +82,8 @@ class QuantileDiscretizerSuite
   }
 
   test("Verify resulting model has parent") {
-    val sqlCtx = SQLContext.getOrCreate(sc)
-    import sqlCtx.implicits._
+    val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
+    import spark.implicits._
 
     val df = sc.parallelize(1 to 100).map(Tuple1.apply).toDF("input")
     val discretizer = new QuantileDiscretizer()

http://git-wip-us.apache.org/repos/asf/spark/blob/a355edee/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
index ba8d36f..db56aff 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
@@ -21,9 +21,9 @@ import java.io.File
 
 import org.scalatest.Suite
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext
 import org.apache.spark.ml.util.TempDirectory
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.Utils
 
 trait MLlibTestSparkContext extends TempDirectory { self: Suite =>
@@ -46,7 +46,7 @@ trait MLlibTestSparkContext extends TempDirectory { self: 
Suite =>
   override def afterAll() {
     try {
       Utils.deleteRecursively(new File(checkpointDir))
-      SQLContext.clearActive()
+      SparkSession.clearActiveSession()
       if (spark != null) {
         spark.stop()
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to