Repository: spark
Updated Branches:
  refs/heads/master 50b89d05b -> f234b7cd7


http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index b478fea..a6bbb94 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -29,6 +29,8 @@ import org.apache.spark.sql.types.{DoubleType, StringType, 
StructField, StructTy
 class StringIndexerSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   test("params") {
     ParamsSuite.checkParams(new StringIndexer)
     val model = new StringIndexerModel("indexer", Array("a", "b"))
@@ -38,8 +40,8 @@ class StringIndexerSuite
   }
 
   test("StringIndexer") {
-    val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, 
"a"), (5, "c")), 2)
-    val df = spark.createDataFrame(data).toDF("id", "label")
+    val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
+    val df = data.toDF("id", "label")
     val indexer = new StringIndexer()
       .setInputCol("label")
       .setOutputCol("labelIndex")
@@ -61,10 +63,10 @@ class StringIndexerSuite
   }
 
   test("StringIndexerUnseen") {
-    val data = sc.parallelize(Seq((0, "a"), (1, "b"), (4, "b")), 2)
-    val data2 = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c")), 2)
-    val df = spark.createDataFrame(data).toDF("id", "label")
-    val df2 = spark.createDataFrame(data2).toDF("id", "label")
+    val data = Seq((0, "a"), (1, "b"), (4, "b"))
+    val data2 = Seq((0, "a"), (1, "b"), (2, "c"))
+    val df = data.toDF("id", "label")
+    val df2 = data2.toDF("id", "label")
     val indexer = new StringIndexer()
       .setInputCol("label")
       .setOutputCol("labelIndex")
@@ -92,8 +94,8 @@ class StringIndexerSuite
   }
 
   test("StringIndexer with a numeric input column") {
-    val data = sc.parallelize(Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 
100), (5, 300)), 2)
-    val df = spark.createDataFrame(data).toDF("id", "label")
+    val data = Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 100), (5, 300))
+    val df = data.toDF("id", "label")
     val indexer = new StringIndexer()
       .setInputCol("label")
       .setOutputCol("labelIndex")
@@ -119,7 +121,7 @@ class StringIndexerSuite
   }
 
   test("StringIndexerModel can't overwrite output column") {
-    val df = spark.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", "output")
+    val df = Seq((1, 2), (3, 4)).toDF("input", "output")
     intercept[IllegalArgumentException] {
       new StringIndexer()
         .setInputCol("input")
@@ -161,9 +163,7 @@ class StringIndexerSuite
 
   test("IndexToString.transform") {
     val labels = Array("a", "b", "c")
-    val df0 = spark.createDataFrame(Seq(
-      (0, "a"), (1, "b"), (2, "c"), (0, "a")
-    )).toDF("index", "expected")
+    val df0 = Seq((0, "a"), (1, "b"), (2, "c"), (0, "a")).toDF("index", 
"expected")
 
     val idxToStr0 = new IndexToString()
       .setInputCol("index")
@@ -187,8 +187,8 @@ class StringIndexerSuite
   }
 
   test("StringIndexer, IndexToString are inverses") {
-    val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, 
"a"), (5, "c")), 2)
-    val df = spark.createDataFrame(data).toDF("id", "label")
+    val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
+    val df = data.toDF("id", "label")
     val indexer = new StringIndexer()
       .setInputCol("label")
       .setOutputCol("labelIndex")
@@ -220,8 +220,8 @@ class StringIndexerSuite
   }
 
   test("StringIndexer metadata") {
-    val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, 
"a"), (5, "c")), 2)
-    val df = spark.createDataFrame(data).toDF("id", "label")
+    val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
+    val df = data.toDF("id", "label")
     val indexer = new StringIndexer()
       .setInputCol("label")
       .setOutputCol("labelIndex")

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
index f30bdc3..c895659 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala
@@ -46,6 +46,7 @@ class RegexTokenizerSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
   import org.apache.spark.ml.feature.RegexTokenizerSuite._
+  import testImplicits._
 
   test("params") {
     ParamsSuite.checkParams(new RegexTokenizer)
@@ -57,26 +58,26 @@ class RegexTokenizerSuite
       .setPattern("\\w+|\\p{Punct}")
       .setInputCol("rawText")
       .setOutputCol("tokens")
-    val dataset0 = spark.createDataFrame(Seq(
+    val dataset0 = Seq(
       TokenizerTestData("Test for tokenization.", Array("test", "for", 
"tokenization", ".")),
       TokenizerTestData("Te,st. punct", Array("te", ",", "st", ".", "punct"))
-    ))
+    ).toDF()
     testRegexTokenizer(tokenizer0, dataset0)
 
-    val dataset1 = spark.createDataFrame(Seq(
+    val dataset1 = Seq(
       TokenizerTestData("Test for tokenization.", Array("test", "for", 
"tokenization")),
       TokenizerTestData("Te,st. punct", Array("punct"))
-    ))
+    ).toDF()
     tokenizer0.setMinTokenLength(3)
     testRegexTokenizer(tokenizer0, dataset1)
 
     val tokenizer2 = new RegexTokenizer()
       .setInputCol("rawText")
       .setOutputCol("tokens")
-    val dataset2 = spark.createDataFrame(Seq(
+    val dataset2 = Seq(
       TokenizerTestData("Test for tokenization.", Array("test", "for", 
"tokenization.")),
       TokenizerTestData("Te,st.  punct", Array("te,st.", "punct"))
-    ))
+    ).toDF()
     testRegexTokenizer(tokenizer2, dataset2)
   }
 
@@ -85,10 +86,10 @@ class RegexTokenizerSuite
       .setInputCol("rawText")
       .setOutputCol("tokens")
       .setToLowercase(false)
-    val dataset = spark.createDataFrame(Seq(
+    val dataset = Seq(
       TokenizerTestData("JAVA SCALA", Array("JAVA", "SCALA")),
       TokenizerTestData("java scala", Array("java", "scala"))
-    ))
+    ).toDF()
     testRegexTokenizer(tokenizer, dataset)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala
index 561493f..46cced3 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala
@@ -29,6 +29,8 @@ import org.apache.spark.sql.functions.col
 class VectorAssemblerSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   test("params") {
     ParamsSuite.checkParams(new VectorAssembler)
   }
@@ -57,9 +59,9 @@ class VectorAssemblerSuite
   }
 
   test("VectorAssembler") {
-    val df = spark.createDataFrame(Seq(
+    val df = Seq(
       (0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), 
Array(3.0)), 10L)
-    )).toDF("id", "x", "y", "name", "z", "n")
+    ).toDF("id", "x", "y", "name", "z", "n")
     val assembler = new VectorAssembler()
       .setInputCols(Array("x", "y", "z", "n"))
       .setOutputCol("features")
@@ -70,7 +72,7 @@ class VectorAssemblerSuite
   }
 
   test("transform should throw an exception in case of unsupported type") {
-    val df = spark.createDataFrame(Seq(("a", "b", "c"))).toDF("a", "b", "c")
+    val df = Seq(("a", "b", "c")).toDF("a", "b", "c")
     val assembler = new VectorAssembler()
       .setInputCols(Array("a", "b", "c"))
       .setOutputCol("features")
@@ -87,7 +89,7 @@ class VectorAssemblerSuite
       NominalAttribute.defaultAttr.withName("gender").withValues("male", 
"female"),
       NumericAttribute.defaultAttr.withName("salary")))
     val row = (1.0, 0.5, 1, Vectors.dense(1.0, 1000.0), Vectors.sparse(2, 
Array(1), Array(2.0)))
-    val df = spark.createDataFrame(Seq(row)).toDF("browser", "hour", "count", 
"user", "ad")
+    val df = Seq(row).toDF("browser", "hour", "count", "user", "ad")
       .select(
         col("browser").as("browser", browser.toMetadata()),
         col("hour").as("hour", hour.toMetadata()),

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
index 7071423..4da1b13 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.DataFrame
 class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext
   with DefaultReadWriteTest with Logging {
 
+  import testImplicits._
   import VectorIndexerSuite.FeatureData
 
   // identical, of length 3
@@ -85,11 +86,13 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
     checkPair(densePoints1Seq, sparsePoints1Seq)
     checkPair(densePoints2Seq, sparsePoints2Seq)
 
-    densePoints1 = spark.createDataFrame(sc.parallelize(densePoints1Seq, 
2).map(FeatureData))
-    sparsePoints1 = spark.createDataFrame(sc.parallelize(sparsePoints1Seq, 
2).map(FeatureData))
-    densePoints2 = spark.createDataFrame(sc.parallelize(densePoints2Seq, 
2).map(FeatureData))
-    sparsePoints2 = spark.createDataFrame(sc.parallelize(sparsePoints2Seq, 
2).map(FeatureData))
-    badPoints = spark.createDataFrame(sc.parallelize(badPointsSeq, 
2).map(FeatureData))
+    densePoints1 = densePoints1Seq.map(FeatureData).toDF()
+    sparsePoints1 = sparsePoints1Seq.map(FeatureData).toDF()
+    // TODO: If we directly use `toDF` without parallelize, the test in
+    // "Throws error when given RDDs with different size vectors" is failed 
for an unknown reason.
+    densePoints2 = sc.parallelize(densePoints2Seq, 2).map(FeatureData).toDF()
+    sparsePoints2 = sparsePoints2Seq.map(FeatureData).toDF()
+    badPoints = badPointsSeq.map(FeatureData).toDF()
   }
 
   private def getIndexer: VectorIndexer =
@@ -102,7 +105,7 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
   }
 
   test("Cannot fit an empty DataFrame") {
-    val rdd = spark.createDataFrame(sc.parallelize(Array.empty[Vector], 
2).map(FeatureData))
+    val rdd = Array.empty[Vector].map(FeatureData).toSeq.toDF()
     val vectorIndexer = getIndexer
     intercept[IllegalArgumentException] {
       vectorIndexer.fit(rdd)

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
index 1c70b70..0fdfdf3 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala
@@ -31,23 +31,22 @@ import org.apache.spark.sql.{DataFrame, Row}
 class AFTSurvivalRegressionSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   @transient var datasetUnivariate: DataFrame = _
   @transient var datasetMultivariate: DataFrame = _
   @transient var datasetUnivariateScaled: DataFrame = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    datasetUnivariate = spark.createDataFrame(
-      sc.parallelize(generateAFTInput(
-        1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)))
-    datasetMultivariate = spark.createDataFrame(
-      sc.parallelize(generateAFTInput(
-        2, Array(0.9, -1.3), Array(0.7, 1.2), 1000, 42, 1.5, 2.5, 2.0)))
-    datasetUnivariateScaled = spark.createDataFrame(
-      sc.parallelize(generateAFTInput(
-        1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)).map { x =>
-          AFTPoint(Vectors.dense(x.features(0) * 1.0E3), x.label, x.censor)
-      })
+    datasetUnivariate = generateAFTInput(
+      1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0).toDF()
+    datasetMultivariate = generateAFTInput(
+      2, Array(0.9, -1.3), Array(0.7, 1.2), 1000, 42, 1.5, 2.5, 2.0).toDF()
+    datasetUnivariateScaled = sc.parallelize(
+      generateAFTInput(1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 
2.0)).map { x =>
+        AFTPoint(Vectors.dense(x.features(0) * 1.0E3), x.label, x.censor)
+      }.toDF()
   }
 
   /**
@@ -396,9 +395,8 @@ class AFTSurvivalRegressionSuite
     // the parallelism is bigger than that. Because the issue was about 
`AFTAggregator`s
     // being merged incorrectly when it has an empty partition, running the 
codes below
     // should not throw an exception.
-    val dataset = spark.createDataFrame(
-      sc.parallelize(generateAFTInput(
-        1, Array(5.5), Array(0.8), 2, 42, 1.0, 2.0, 2.0), numSlices = 3))
+    val dataset = sc.parallelize(generateAFTInput(
+      1, Array(5.5), Array(0.8), 2, 42, 1.0, 2.0, 2.0), numSlices = 3).toDF()
     val trainer = new AFTSurvivalRegression()
     trainer.fit(dataset)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
index 7b5df8f..dcf3f9a 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala
@@ -37,6 +37,7 @@ class GBTRegressorSuite extends SparkFunSuite with 
MLlibTestSparkContext
   with DefaultReadWriteTest {
 
   import GBTRegressorSuite.compareAPIs
+  import testImplicits._
 
   // Combinations for estimators, learning rates and subsamplingRate
   private val testCombinations =
@@ -76,14 +77,14 @@ class GBTRegressorSuite extends SparkFunSuite with 
MLlibTestSparkContext
   }
 
   test("GBTRegressor behaves reasonably on toy data") {
-    val df = spark.createDataFrame(Seq(
+    val df = Seq(
       LabeledPoint(10, Vectors.dense(1, 2, 3, 4)),
       LabeledPoint(-5, Vectors.dense(6, 3, 2, 1)),
       LabeledPoint(11, Vectors.dense(2, 2, 3, 4)),
       LabeledPoint(-6, Vectors.dense(6, 4, 2, 1)),
       LabeledPoint(9, Vectors.dense(1, 2, 6, 4)),
       LabeledPoint(-4, Vectors.dense(6, 3, 2, 2))
-    ))
+    ).toDF()
     val gbt = new GBTRegressor()
       .setMaxDepth(2)
       .setMaxIter(2)
@@ -103,7 +104,7 @@ class GBTRegressorSuite extends SparkFunSuite with 
MLlibTestSparkContext
     val path = tempDir.toURI.toString
     sc.setCheckpointDir(path)
 
-    val df = spark.createDataFrame(data)
+    val df = data.toDF()
     val gbt = new GBTRegressor()
       .setMaxDepth(2)
       .setMaxIter(5)

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
index d8032c4..937aa7d 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala
@@ -35,6 +35,8 @@ import org.apache.spark.sql.functions._
 class GeneralizedLinearRegressionSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   private val seed: Int = 42
   @transient var datasetGaussianIdentity: DataFrame = _
   @transient var datasetGaussianLog: DataFrame = _
@@ -52,23 +54,20 @@ class GeneralizedLinearRegressionSuite
 
     import GeneralizedLinearRegressionSuite._
 
-    datasetGaussianIdentity = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "gaussian", link = "identity"), 2))
+    datasetGaussianIdentity = generateGeneralizedLinearRegressionInput(
+      intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "gaussian", link = "identity").toDF()
 
-    datasetGaussianLog = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "gaussian", link = "log"), 2))
+    datasetGaussianLog = generateGeneralizedLinearRegressionInput(
+      intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "gaussian", link = "log").toDF()
 
-    datasetGaussianInverse = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "gaussian", link = "inverse"), 2))
+    datasetGaussianInverse = generateGeneralizedLinearRegressionInput(
+      intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "gaussian", link = "inverse").toDF()
 
     datasetBinomial = {
       val nPoints = 10000
@@ -80,44 +79,38 @@ class GeneralizedLinearRegressionSuite
         generateMultinomialLogisticInput(coefficients, xMean, xVariance,
           addIntercept = true, nPoints, seed)
 
-      spark.createDataFrame(sc.parallelize(testData, 2))
+      testData.toDF()
     }
 
-    datasetPoissonLog = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "poisson", link = "log"), 2))
-
-    datasetPoissonIdentity = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "poisson", link = "identity"), 2))
-
-    datasetPoissonSqrt = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "poisson", link = "sqrt"), 2))
-
-    datasetGammaInverse = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "gamma", link = "inverse"), 2))
-
-    datasetGammaIdentity = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "gamma", link = "identity"), 2))
-
-    datasetGammaLog = spark.createDataFrame(
-      sc.parallelize(generateGeneralizedLinearRegressionInput(
-        intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
-        family = "gamma", link = "log"), 2))
+    datasetPoissonLog = generateGeneralizedLinearRegressionInput(
+      intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "poisson", link = "log").toDF()
+
+    datasetPoissonIdentity = generateGeneralizedLinearRegressionInput(
+      intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "poisson", link = "identity").toDF()
+
+    datasetPoissonSqrt = generateGeneralizedLinearRegressionInput(
+      intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "poisson", link = "sqrt").toDF()
+
+    datasetGammaInverse = generateGeneralizedLinearRegressionInput(
+      intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "gamma", link = "inverse").toDF()
+
+    datasetGammaIdentity = generateGeneralizedLinearRegressionInput(
+      intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "gamma", link = "identity").toDF()
+
+    datasetGammaLog = generateGeneralizedLinearRegressionInput(
+      intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 
10.5),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01,
+      family = "gamma", link = "log").toDF()
   }
 
   /**
@@ -540,12 +533,12 @@ class GeneralizedLinearRegressionSuite
        w <- c(1, 2, 3, 4)
        df <- as.data.frame(cbind(A, b))
      */
-    val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
+    val datasetWithWeight = Seq(
       Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
       Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)),
       Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)),
       Instance(29.0, 4.0, Vectors.dense(3.0, 13.0))
-    ), 2))
+    ).toDF()
     /*
        R code:
 
@@ -668,12 +661,12 @@ class GeneralizedLinearRegressionSuite
        w <- c(1, 2, 3, 4)
        df <- as.data.frame(cbind(A, b))
      */
-    val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
+    val datasetWithWeight = Seq(
       Instance(1.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
       Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)),
       Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)),
       Instance(0.0, 4.0, Vectors.dense(3.0, 3.0))
-    ), 2))
+    ).toDF()
     /*
        R code:
 
@@ -782,12 +775,12 @@ class GeneralizedLinearRegressionSuite
        w <- c(1, 2, 3, 4)
        df <- as.data.frame(cbind(A, b))
      */
-    val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
+    val datasetWithWeight = Seq(
       Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
       Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)),
       Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)),
       Instance(9.0, 4.0, Vectors.dense(3.0, 13.0))
-    ), 2))
+    ).toDF()
     /*
        R code:
 
@@ -899,12 +892,12 @@ class GeneralizedLinearRegressionSuite
        w <- c(1, 2, 3, 4)
        df <- as.data.frame(cbind(A, b))
      */
-    val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq(
+    val datasetWithWeight = Seq(
       Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
       Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)),
       Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)),
       Instance(9.0, 4.0, Vectors.dense(3.0, 13.0))
-    ), 2))
+    ).toDF()
     /*
        R code:
 
@@ -1054,12 +1047,12 @@ class GeneralizedLinearRegressionSuite
       [1] 12.92681
       [1] 13.32836
      */
-    val dataset = spark.createDataFrame(Seq(
+    val dataset = Seq(
       LabeledPoint(1, Vectors.dense(5, 0)),
       LabeledPoint(0, Vectors.dense(2, 1)),
       LabeledPoint(1, Vectors.dense(1, 2)),
       LabeledPoint(0, Vectors.dense(3, 3))
-    ))
+    ).toDF()
     val expected = Seq(12.88188, 12.92681, 13.32836)
 
     var idx = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
index 14d8a4e..c2c7947 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -27,15 +27,15 @@ import org.apache.spark.sql.{DataFrame, Row}
 class IsotonicRegressionSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   private def generateIsotonicInput(labels: Seq[Double]): DataFrame = {
-    spark.createDataFrame(
-      labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) }
-    ).toDF("label", "features", "weight")
+    labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) }
+      .toDF("label", "features", "weight")
   }
 
   private def generatePredictionInput(features: Seq[Double]): DataFrame = {
-    spark.createDataFrame(features.map(Tuple1.apply))
-      .toDF("features")
+    features.map(Tuple1.apply).toDF("features")
   }
 
   test("isotonic regression predictions") {
@@ -145,10 +145,10 @@ class IsotonicRegressionSuite
   }
 
   test("vector features column with feature index") {
-    val dataset = spark.createDataFrame(Seq(
+    val dataset = Seq(
       (4.0, Vectors.dense(0.0, 1.0)),
       (3.0, Vectors.dense(0.0, 2.0)),
-      (5.0, Vectors.sparse(2, Array(1), Array(3.0))))
+      (5.0, Vectors.sparse(2, Array(1), Array(3.0)))
     ).toDF("label", "features")
 
     val ir = new IsotonicRegression()

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
index 265f2f4..5ae371b 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
@@ -32,6 +32,8 @@ import org.apache.spark.sql.{DataFrame, Row}
 class LinearRegressionSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   private val seed: Int = 42
   @transient var datasetWithDenseFeature: DataFrame = _
   @transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _
@@ -42,29 +44,27 @@ class LinearRegressionSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    datasetWithDenseFeature = spark.createDataFrame(
-      sc.parallelize(LinearDataGenerator.generateLinearInput(
-        intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 
2).map(_.asML))
+    datasetWithDenseFeature = 
sc.parallelize(LinearDataGenerator.generateLinearInput(
+      intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
+      xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 
2).map(_.asML).toDF()
     /*
        datasetWithDenseFeatureWithoutIntercept is not needed for correctness 
testing
        but is useful for illustrating training model without intercept
      */
-    datasetWithDenseFeatureWithoutIntercept = spark.createDataFrame(
-      sc.parallelize(LinearDataGenerator.generateLinearInput(
+    datasetWithDenseFeatureWithoutIntercept = sc.parallelize(
+      LinearDataGenerator.generateLinearInput(
         intercept = 0.0, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3),
-        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 
2).map(_.asML))
+        xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 
2).map(_.asML).toDF()
 
     val r = new Random(seed)
     // When feature size is larger than 4096, normal optimizer is choosed
     // as the solver of linear regression in the case of "auto" mode.
     val featureSize = 4100
-    datasetWithSparseFeature = spark.createDataFrame(
-      sc.parallelize(LinearDataGenerator.generateLinearInput(
+    datasetWithSparseFeature = 
sc.parallelize(LinearDataGenerator.generateLinearInput(
         intercept = 0.0, weights = 
Seq.fill(featureSize)(r.nextDouble()).toArray,
         xMean = Seq.fill(featureSize)(r.nextDouble()).toArray,
         xVariance = Seq.fill(featureSize)(r.nextDouble()).toArray, nPoints = 
200,
-        seed, eps = 0.1, sparsity = 0.7), 2).map(_.asML))
+        seed, eps = 0.1, sparsity = 0.7), 2).map(_.asML).toDF()
 
     /*
        R code:
@@ -74,13 +74,12 @@ class LinearRegressionSuite
        w <- c(1, 2, 3, 4)
        df <- as.data.frame(cbind(A, b))
      */
-    datasetWithWeight = spark.createDataFrame(
-      sc.parallelize(Seq(
-        Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
-        Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)),
-        Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)),
-        Instance(29.0, 4.0, Vectors.dense(3.0, 13.0))
-      ), 2))
+    datasetWithWeight = sc.parallelize(Seq(
+      Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
+      Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)),
+      Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)),
+      Instance(29.0, 4.0, Vectors.dense(3.0, 13.0))
+    ), 2).toDF()
 
     /*
        R code:
@@ -90,20 +89,18 @@ class LinearRegressionSuite
        w <- c(1, 2, 3, 4)
        df.const.label <- as.data.frame(cbind(A, b.const))
      */
-    datasetWithWeightConstantLabel = spark.createDataFrame(
-      sc.parallelize(Seq(
-        Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
-        Instance(17.0, 2.0, Vectors.dense(1.0, 7.0)),
-        Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)),
-        Instance(17.0, 4.0, Vectors.dense(3.0, 13.0))
-      ), 2))
-    datasetWithWeightZeroLabel = spark.createDataFrame(
-      sc.parallelize(Seq(
-        Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
-        Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)),
-        Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)),
-        Instance(0.0, 4.0, Vectors.dense(3.0, 13.0))
-      ), 2))
+    datasetWithWeightConstantLabel = sc.parallelize(Seq(
+      Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
+      Instance(17.0, 2.0, Vectors.dense(1.0, 7.0)),
+      Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)),
+      Instance(17.0, 4.0, Vectors.dense(3.0, 13.0))
+    ), 2).toDF()
+    datasetWithWeightZeroLabel = sc.parallelize(Seq(
+      Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse),
+      Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)),
+      Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)),
+      Instance(0.0, 4.0, Vectors.dense(3.0, 13.0))
+    ), 2).toDF()
   }
 
   /**
@@ -839,8 +836,7 @@ class LinearRegressionSuite
         }
         val data2 = weightedSignedData ++ weightedNoiseData
 
-        (spark.createDataFrame(sc.parallelize(data1, 4)),
-          spark.createDataFrame(sc.parallelize(data2, 4)))
+        (sc.parallelize(data1, 4).toDF(), sc.parallelize(data2, 4).toDF())
       }
 
       val trainer1a = (new LinearRegression).setFitIntercept(true)

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala
index 5c50a88..4109a29 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala
@@ -32,13 +32,15 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
  */
 class GradientBoostedTreesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Logging {
 
+  import testImplicits._
+
   test("runWithValidation stops early and performs better on a validation 
dataset") {
     // Set numIterations large enough so that it stops early.
     val numIterations = 20
     val trainRdd = sc.parallelize(OldGBTSuite.trainData, 2).map(_.asML)
     val validateRdd = sc.parallelize(OldGBTSuite.validateData, 2).map(_.asML)
-    val trainDF = spark.createDataFrame(trainRdd)
-    val validateDF = spark.createDataFrame(validateRdd)
+    val trainDF = trainRdd.toDF()
+    val validateDF = validateRdd.toDF()
 
     val algos = Array(Regression, Regression, Classification)
     val losses = Array(SquaredError, AbsoluteError, LogLoss)

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
index 750dc5b..7116265 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala
@@ -35,12 +35,13 @@ import org.apache.spark.sql.types.StructType
 class CrossValidatorSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
+  import testImplicits._
+
   @transient var dataset: Dataset[_] = _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    dataset = spark.createDataFrame(
-      sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2))
+    dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 
2).toDF()
   }
 
   test("cross validation with logistic regression") {
@@ -67,9 +68,10 @@ class CrossValidatorSuite
   }
 
   test("cross validation with linear regression") {
-    val dataset = spark.createDataFrame(
-      sc.parallelize(LinearDataGenerator.generateLinearInput(
-        6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 
0.1), 2).map(_.asML))
+    val dataset = sc.parallelize(
+      LinearDataGenerator.generateLinearInput(
+        6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 
0.1), 2)
+      .map(_.asML).toDF()
 
     val trainer = new LinearRegression().setSolver("l-bfgs")
     val lrParamMaps = new ParamGridBuilder()

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
index 9971371..87100ae 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala
@@ -33,9 +33,11 @@ import org.apache.spark.sql.types.StructType
 
 class TrainValidationSplitSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+
+  import testImplicits._
+
   test("train validation with logistic regression") {
-    val dataset = spark.createDataFrame(
-      sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2))
+    val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 
2).toDF()
 
     val lr = new LogisticRegression
     val lrParamMaps = new ParamGridBuilder()
@@ -58,9 +60,10 @@ class TrainValidationSplitSuite
   }
 
   test("train validation with linear regression") {
-    val dataset = spark.createDataFrame(
-        sc.parallelize(LinearDataGenerator.generateLinearInput(
-            6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 
0.1), 2).map(_.asML))
+    val dataset = sc.parallelize(
+      LinearDataGenerator.generateLinearInput(
+        6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 
0.1), 2)
+      .map(_.asML).toDF()
 
     val trainer = new LinearRegression().setSolver("l-bfgs")
     val lrParamMaps = new ParamGridBuilder()

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 6aa93c9..e4e9be3 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -37,6 +37,8 @@ import org.apache.spark.util.Utils
 
 class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext {
 
+  import testImplicits._
+
   test("epsilon computation") {
     assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
     assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.")
@@ -255,9 +257,7 @@ class MLUtilsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val z = Vectors.dense(4.0)
     val p = (5.0, z)
     val w = Vectors.dense(6.0).asML
-    val df = spark.createDataFrame(Seq(
-      (0, x, y, p, w)
-    )).toDF("id", "x", "y", "p", "w")
+    val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
       .withColumn("x", col("x"), metadata)
     val newDF1 = convertVectorColumnsToML(df)
     assert(newDF1.schema("x").metadata === metadata, "Metadata should be 
preserved.")
@@ -282,9 +282,7 @@ class MLUtilsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val z = Vectors.dense(4.0).asML
     val p = (5.0, z)
     val w = Vectors.dense(6.0)
-    val df = spark.createDataFrame(Seq(
-      (0, x, y, p, w)
-    )).toDF("id", "x", "y", "p", "w")
+    val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
       .withColumn("x", col("x"), metadata)
     val newDF1 = convertVectorColumnsFromML(df)
     assert(newDF1.schema("x").metadata === metadata, "Metadata should be 
preserved.")
@@ -309,9 +307,7 @@ class MLUtilsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val z = Matrices.ones(1, 1)
     val p = (5.0, z)
     val w = Matrices.dense(1, 1, Array(4.5)).asML
-    val df = spark.createDataFrame(Seq(
-      (0, x, y, p, w)
-    )).toDF("id", "x", "y", "p", "w")
+    val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
       .withColumn("x", col("x"), metadata)
     val newDF1 = convertMatrixColumnsToML(df)
     assert(newDF1.schema("x").metadata === metadata, "Metadata should be 
preserved.")
@@ -336,9 +332,7 @@ class MLUtilsSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val z = Matrices.ones(1, 1).asML
     val p = (5.0, z)
     val w = Matrices.dense(1, 1, Array(4.5))
-    val df = spark.createDataFrame(Seq(
-      (0, x, y, p, w)
-    )).toDF("id", "x", "y", "p", "w")
+    val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w")
       .withColumn("x", col("x"), metadata)
     val newDF1 = convertMatrixColumnsFromML(df)
     assert(newDF1.schema("x").metadata === metadata, "Metadata should be 
preserved.")

http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
index db56aff..6bb7ed9 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala
@@ -23,7 +23,7 @@ import org.scalatest.Suite
 
 import org.apache.spark.SparkContext
 import org.apache.spark.ml.util.TempDirectory
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{SparkSession, SQLContext, SQLImplicits}
 import org.apache.spark.util.Utils
 
 trait MLlibTestSparkContext extends TempDirectory { self: Suite =>
@@ -55,4 +55,15 @@ trait MLlibTestSparkContext extends TempDirectory { self: 
Suite =>
       super.afterAll()
     }
   }
+
+  /**
+   * A helper object for importing SQL implicits.
+   *
+   * Note that the alternative of importing `spark.implicits._` is not 
possible here.
+   * This is because we create the [[SQLContext]] immediately before the first 
test is run,
+   * but the implicits import is needed in the constructor.
+   */
+  protected object testImplicits extends SQLImplicits {
+    protected override def _sqlContext: SQLContext = self.spark.sqlContext
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to