Repository: spark Updated Branches: refs/heads/master 50b89d05b -> f234b7cd7
http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index b478fea..a6bbb94 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -29,6 +29,8 @@ import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructTy class StringIndexerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + test("params") { ParamsSuite.checkParams(new StringIndexer) val model = new StringIndexerModel("indexer", Array("a", "b")) @@ -38,8 +40,8 @@ class StringIndexerSuite } test("StringIndexer") { - val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2) - val df = spark.createDataFrame(data).toDF("id", "label") + val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) + val df = data.toDF("id", "label") val indexer = new StringIndexer() .setInputCol("label") .setOutputCol("labelIndex") @@ -61,10 +63,10 @@ class StringIndexerSuite } test("StringIndexerUnseen") { - val data = sc.parallelize(Seq((0, "a"), (1, "b"), (4, "b")), 2) - val data2 = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c")), 2) - val df = spark.createDataFrame(data).toDF("id", "label") - val df2 = spark.createDataFrame(data2).toDF("id", "label") + val data = Seq((0, "a"), (1, "b"), (4, "b")) + val data2 = Seq((0, "a"), (1, "b"), (2, "c")) + val df = data.toDF("id", "label") + val df2 = data2.toDF("id", "label") val indexer = new StringIndexer() .setInputCol("label") .setOutputCol("labelIndex") @@ -92,8 +94,8 @@ class StringIndexerSuite } test("StringIndexer with a numeric input column") { - val data = sc.parallelize(Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 100), (5, 300)), 2) - val df = spark.createDataFrame(data).toDF("id", "label") + val data = Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 100), (5, 300)) + val df = data.toDF("id", "label") val indexer = new StringIndexer() .setInputCol("label") .setOutputCol("labelIndex") @@ -119,7 +121,7 @@ class StringIndexerSuite } test("StringIndexerModel can't overwrite output column") { - val df = spark.createDataFrame(Seq((1, 2), (3, 4))).toDF("input", "output") + val df = Seq((1, 2), (3, 4)).toDF("input", "output") intercept[IllegalArgumentException] { new StringIndexer() .setInputCol("input") @@ -161,9 +163,7 @@ class StringIndexerSuite test("IndexToString.transform") { val labels = Array("a", "b", "c") - val df0 = spark.createDataFrame(Seq( - (0, "a"), (1, "b"), (2, "c"), (0, "a") - )).toDF("index", "expected") + val df0 = Seq((0, "a"), (1, "b"), (2, "c"), (0, "a")).toDF("index", "expected") val idxToStr0 = new IndexToString() .setInputCol("index") @@ -187,8 +187,8 @@ class StringIndexerSuite } test("StringIndexer, IndexToString are inverses") { - val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2) - val df = spark.createDataFrame(data).toDF("id", "label") + val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) + val df = data.toDF("id", "label") val indexer = new StringIndexer() .setInputCol("label") .setOutputCol("labelIndex") @@ -220,8 +220,8 @@ class StringIndexerSuite } test("StringIndexer metadata") { - val data = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")), 2) - val df = spark.createDataFrame(data).toDF("id", "label") + val data = Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")) + val df = data.toDF("id", "label") val indexer = new StringIndexer() .setInputCol("label") .setOutputCol("labelIndex") http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala index f30bdc3..c895659 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala @@ -46,6 +46,7 @@ class RegexTokenizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { import org.apache.spark.ml.feature.RegexTokenizerSuite._ + import testImplicits._ test("params") { ParamsSuite.checkParams(new RegexTokenizer) @@ -57,26 +58,26 @@ class RegexTokenizerSuite .setPattern("\\w+|\\p{Punct}") .setInputCol("rawText") .setOutputCol("tokens") - val dataset0 = spark.createDataFrame(Seq( + val dataset0 = Seq( TokenizerTestData("Test for tokenization.", Array("test", "for", "tokenization", ".")), TokenizerTestData("Te,st. punct", Array("te", ",", "st", ".", "punct")) - )) + ).toDF() testRegexTokenizer(tokenizer0, dataset0) - val dataset1 = spark.createDataFrame(Seq( + val dataset1 = Seq( TokenizerTestData("Test for tokenization.", Array("test", "for", "tokenization")), TokenizerTestData("Te,st. punct", Array("punct")) - )) + ).toDF() tokenizer0.setMinTokenLength(3) testRegexTokenizer(tokenizer0, dataset1) val tokenizer2 = new RegexTokenizer() .setInputCol("rawText") .setOutputCol("tokens") - val dataset2 = spark.createDataFrame(Seq( + val dataset2 = Seq( TokenizerTestData("Test for tokenization.", Array("test", "for", "tokenization.")), TokenizerTestData("Te,st. punct", Array("te,st.", "punct")) - )) + ).toDF() testRegexTokenizer(tokenizer2, dataset2) } @@ -85,10 +86,10 @@ class RegexTokenizerSuite .setInputCol("rawText") .setOutputCol("tokens") .setToLowercase(false) - val dataset = spark.createDataFrame(Seq( + val dataset = Seq( TokenizerTestData("JAVA SCALA", Array("JAVA", "SCALA")), TokenizerTestData("java scala", Array("java", "scala")) - )) + ).toDF() testRegexTokenizer(tokenizer, dataset) } http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala index 561493f..46cced3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala @@ -29,6 +29,8 @@ import org.apache.spark.sql.functions.col class VectorAssemblerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + test("params") { ParamsSuite.checkParams(new VectorAssembler) } @@ -57,9 +59,9 @@ class VectorAssemblerSuite } test("VectorAssembler") { - val df = spark.createDataFrame(Seq( + val df = Seq( (0, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 10L) - )).toDF("id", "x", "y", "name", "z", "n") + ).toDF("id", "x", "y", "name", "z", "n") val assembler = new VectorAssembler() .setInputCols(Array("x", "y", "z", "n")) .setOutputCol("features") @@ -70,7 +72,7 @@ class VectorAssemblerSuite } test("transform should throw an exception in case of unsupported type") { - val df = spark.createDataFrame(Seq(("a", "b", "c"))).toDF("a", "b", "c") + val df = Seq(("a", "b", "c")).toDF("a", "b", "c") val assembler = new VectorAssembler() .setInputCols(Array("a", "b", "c")) .setOutputCol("features") @@ -87,7 +89,7 @@ class VectorAssemblerSuite NominalAttribute.defaultAttr.withName("gender").withValues("male", "female"), NumericAttribute.defaultAttr.withName("salary"))) val row = (1.0, 0.5, 1, Vectors.dense(1.0, 1000.0), Vectors.sparse(2, Array(1), Array(2.0))) - val df = spark.createDataFrame(Seq(row)).toDF("browser", "hour", "count", "user", "ad") + val df = Seq(row).toDF("browser", "hour", "count", "user", "ad") .select( col("browser").as("browser", browser.toMetadata()), col("hour").as("hour", hour.toMetadata()), http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index 7071423..4da1b13 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.DataFrame class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest with Logging { + import testImplicits._ import VectorIndexerSuite.FeatureData // identical, of length 3 @@ -85,11 +86,13 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext checkPair(densePoints1Seq, sparsePoints1Seq) checkPair(densePoints2Seq, sparsePoints2Seq) - densePoints1 = spark.createDataFrame(sc.parallelize(densePoints1Seq, 2).map(FeatureData)) - sparsePoints1 = spark.createDataFrame(sc.parallelize(sparsePoints1Seq, 2).map(FeatureData)) - densePoints2 = spark.createDataFrame(sc.parallelize(densePoints2Seq, 2).map(FeatureData)) - sparsePoints2 = spark.createDataFrame(sc.parallelize(sparsePoints2Seq, 2).map(FeatureData)) - badPoints = spark.createDataFrame(sc.parallelize(badPointsSeq, 2).map(FeatureData)) + densePoints1 = densePoints1Seq.map(FeatureData).toDF() + sparsePoints1 = sparsePoints1Seq.map(FeatureData).toDF() + // TODO: If we directly use `toDF` without parallelize, the test in + // "Throws error when given RDDs with different size vectors" is failed for an unknown reason. + densePoints2 = sc.parallelize(densePoints2Seq, 2).map(FeatureData).toDF() + sparsePoints2 = sparsePoints2Seq.map(FeatureData).toDF() + badPoints = badPointsSeq.map(FeatureData).toDF() } private def getIndexer: VectorIndexer = @@ -102,7 +105,7 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext } test("Cannot fit an empty DataFrame") { - val rdd = spark.createDataFrame(sc.parallelize(Array.empty[Vector], 2).map(FeatureData)) + val rdd = Array.empty[Vector].map(FeatureData).toSeq.toDF() val vectorIndexer = getIndexer intercept[IllegalArgumentException] { vectorIndexer.fit(rdd) http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala index 1c70b70..0fdfdf3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/AFTSurvivalRegressionSuite.scala @@ -31,23 +31,22 @@ import org.apache.spark.sql.{DataFrame, Row} class AFTSurvivalRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + @transient var datasetUnivariate: DataFrame = _ @transient var datasetMultivariate: DataFrame = _ @transient var datasetUnivariateScaled: DataFrame = _ override def beforeAll(): Unit = { super.beforeAll() - datasetUnivariate = spark.createDataFrame( - sc.parallelize(generateAFTInput( - 1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0))) - datasetMultivariate = spark.createDataFrame( - sc.parallelize(generateAFTInput( - 2, Array(0.9, -1.3), Array(0.7, 1.2), 1000, 42, 1.5, 2.5, 2.0))) - datasetUnivariateScaled = spark.createDataFrame( - sc.parallelize(generateAFTInput( - 1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)).map { x => - AFTPoint(Vectors.dense(x.features(0) * 1.0E3), x.label, x.censor) - }) + datasetUnivariate = generateAFTInput( + 1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0).toDF() + datasetMultivariate = generateAFTInput( + 2, Array(0.9, -1.3), Array(0.7, 1.2), 1000, 42, 1.5, 2.5, 2.0).toDF() + datasetUnivariateScaled = sc.parallelize( + generateAFTInput(1, Array(5.5), Array(0.8), 1000, 42, 1.0, 2.0, 2.0)).map { x => + AFTPoint(Vectors.dense(x.features(0) * 1.0E3), x.label, x.censor) + }.toDF() } /** @@ -396,9 +395,8 @@ class AFTSurvivalRegressionSuite // the parallelism is bigger than that. Because the issue was about `AFTAggregator`s // being merged incorrectly when it has an empty partition, running the codes below // should not throw an exception. - val dataset = spark.createDataFrame( - sc.parallelize(generateAFTInput( - 1, Array(5.5), Array(0.8), 2, 42, 1.0, 2.0, 2.0), numSlices = 3)) + val dataset = sc.parallelize(generateAFTInput( + 1, Array(5.5), Array(0.8), 2, 42, 1.0, 2.0, 2.0), numSlices = 3).toDF() val trainer = new AFTSurvivalRegression() trainer.fit(dataset) } http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala index 7b5df8f..dcf3f9a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala @@ -37,6 +37,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { import GBTRegressorSuite.compareAPIs + import testImplicits._ // Combinations for estimators, learning rates and subsamplingRate private val testCombinations = @@ -76,14 +77,14 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext } test("GBTRegressor behaves reasonably on toy data") { - val df = spark.createDataFrame(Seq( + val df = Seq( LabeledPoint(10, Vectors.dense(1, 2, 3, 4)), LabeledPoint(-5, Vectors.dense(6, 3, 2, 1)), LabeledPoint(11, Vectors.dense(2, 2, 3, 4)), LabeledPoint(-6, Vectors.dense(6, 4, 2, 1)), LabeledPoint(9, Vectors.dense(1, 2, 6, 4)), LabeledPoint(-4, Vectors.dense(6, 3, 2, 2)) - )) + ).toDF() val gbt = new GBTRegressor() .setMaxDepth(2) .setMaxIter(2) @@ -103,7 +104,7 @@ class GBTRegressorSuite extends SparkFunSuite with MLlibTestSparkContext val path = tempDir.toURI.toString sc.setCheckpointDir(path) - val df = spark.createDataFrame(data) + val df = data.toDF() val gbt = new GBTRegressor() .setMaxDepth(2) .setMaxIter(5) http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index d8032c4..937aa7d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -35,6 +35,8 @@ import org.apache.spark.sql.functions._ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + private val seed: Int = 42 @transient var datasetGaussianIdentity: DataFrame = _ @transient var datasetGaussianLog: DataFrame = _ @@ -52,23 +54,20 @@ class GeneralizedLinearRegressionSuite import GeneralizedLinearRegressionSuite._ - datasetGaussianIdentity = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "gaussian", link = "identity"), 2)) + datasetGaussianIdentity = generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "gaussian", link = "identity").toDF() - datasetGaussianLog = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "gaussian", link = "log"), 2)) + datasetGaussianLog = generateGeneralizedLinearRegressionInput( + intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "gaussian", link = "log").toDF() - datasetGaussianInverse = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "gaussian", link = "inverse"), 2)) + datasetGaussianInverse = generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "gaussian", link = "inverse").toDF() datasetBinomial = { val nPoints = 10000 @@ -80,44 +79,38 @@ class GeneralizedLinearRegressionSuite generateMultinomialLogisticInput(coefficients, xMean, xVariance, addIntercept = true, nPoints, seed) - spark.createDataFrame(sc.parallelize(testData, 2)) + testData.toDF() } - datasetPoissonLog = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "poisson", link = "log"), 2)) - - datasetPoissonIdentity = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "poisson", link = "identity"), 2)) - - datasetPoissonSqrt = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "poisson", link = "sqrt"), 2)) - - datasetGammaInverse = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "gamma", link = "inverse"), 2)) - - datasetGammaIdentity = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "gamma", link = "identity"), 2)) - - datasetGammaLog = spark.createDataFrame( - sc.parallelize(generateGeneralizedLinearRegressionInput( - intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, - family = "gamma", link = "log"), 2)) + datasetPoissonLog = generateGeneralizedLinearRegressionInput( + intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "poisson", link = "log").toDF() + + datasetPoissonIdentity = generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "poisson", link = "identity").toDF() + + datasetPoissonSqrt = generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "poisson", link = "sqrt").toDF() + + datasetGammaInverse = generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "gamma", link = "inverse").toDF() + + datasetGammaIdentity = generateGeneralizedLinearRegressionInput( + intercept = 2.5, coefficients = Array(2.2, 0.6), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "gamma", link = "identity").toDF() + + datasetGammaLog = generateGeneralizedLinearRegressionInput( + intercept = 0.25, coefficients = Array(0.22, 0.06), xMean = Array(2.9, 10.5), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, noiseLevel = 0.01, + family = "gamma", link = "log").toDF() } /** @@ -540,12 +533,12 @@ class GeneralizedLinearRegressionSuite w <- c(1, 2, 3, 4) df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq( + val datasetWithWeight = Seq( Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2)) + ).toDF() /* R code: @@ -668,12 +661,12 @@ class GeneralizedLinearRegressionSuite w <- c(1, 2, 3, 4) df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq( + val datasetWithWeight = Seq( Instance(1.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)), Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)), Instance(0.0, 4.0, Vectors.dense(3.0, 3.0)) - ), 2)) + ).toDF() /* R code: @@ -782,12 +775,12 @@ class GeneralizedLinearRegressionSuite w <- c(1, 2, 3, 4) df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq( + val datasetWithWeight = Seq( Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2)) + ).toDF() /* R code: @@ -899,12 +892,12 @@ class GeneralizedLinearRegressionSuite w <- c(1, 2, 3, 4) df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = spark.createDataFrame(sc.parallelize(Seq( + val datasetWithWeight = Seq( Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2)) + ).toDF() /* R code: @@ -1054,12 +1047,12 @@ class GeneralizedLinearRegressionSuite [1] 12.92681 [1] 13.32836 */ - val dataset = spark.createDataFrame(Seq( + val dataset = Seq( LabeledPoint(1, Vectors.dense(5, 0)), LabeledPoint(0, Vectors.dense(2, 1)), LabeledPoint(1, Vectors.dense(1, 2)), LabeledPoint(0, Vectors.dense(3, 3)) - )) + ).toDF() val expected = Seq(12.88188, 12.92681, 13.32836) var idx = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala index 14d8a4e..c2c7947 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala @@ -27,15 +27,15 @@ import org.apache.spark.sql.{DataFrame, Row} class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + private def generateIsotonicInput(labels: Seq[Double]): DataFrame = { - spark.createDataFrame( - labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) } - ).toDF("label", "features", "weight") + labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) } + .toDF("label", "features", "weight") } private def generatePredictionInput(features: Seq[Double]): DataFrame = { - spark.createDataFrame(features.map(Tuple1.apply)) - .toDF("features") + features.map(Tuple1.apply).toDF("features") } test("isotonic regression predictions") { @@ -145,10 +145,10 @@ class IsotonicRegressionSuite } test("vector features column with feature index") { - val dataset = spark.createDataFrame(Seq( + val dataset = Seq( (4.0, Vectors.dense(0.0, 1.0)), (3.0, Vectors.dense(0.0, 2.0)), - (5.0, Vectors.sparse(2, Array(1), Array(3.0)))) + (5.0, Vectors.sparse(2, Array(1), Array(3.0))) ).toDF("label", "features") val ir = new IsotonicRegression() http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala index 265f2f4..5ae371b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala @@ -32,6 +32,8 @@ import org.apache.spark.sql.{DataFrame, Row} class LinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + private val seed: Int = 42 @transient var datasetWithDenseFeature: DataFrame = _ @transient var datasetWithDenseFeatureWithoutIntercept: DataFrame = _ @@ -42,29 +44,27 @@ class LinearRegressionSuite override def beforeAll(): Unit = { super.beforeAll() - datasetWithDenseFeature = spark.createDataFrame( - sc.parallelize(LinearDataGenerator.generateLinearInput( - intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML)) + datasetWithDenseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput( + intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF() /* datasetWithDenseFeatureWithoutIntercept is not needed for correctness testing but is useful for illustrating training model without intercept */ - datasetWithDenseFeatureWithoutIntercept = spark.createDataFrame( - sc.parallelize(LinearDataGenerator.generateLinearInput( + datasetWithDenseFeatureWithoutIntercept = sc.parallelize( + LinearDataGenerator.generateLinearInput( intercept = 0.0, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), - xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML)) + xVariance = Array(0.7, 1.2), nPoints = 10000, seed, eps = 0.1), 2).map(_.asML).toDF() val r = new Random(seed) // When feature size is larger than 4096, normal optimizer is choosed // as the solver of linear regression in the case of "auto" mode. val featureSize = 4100 - datasetWithSparseFeature = spark.createDataFrame( - sc.parallelize(LinearDataGenerator.generateLinearInput( + datasetWithSparseFeature = sc.parallelize(LinearDataGenerator.generateLinearInput( intercept = 0.0, weights = Seq.fill(featureSize)(r.nextDouble()).toArray, xMean = Seq.fill(featureSize)(r.nextDouble()).toArray, xVariance = Seq.fill(featureSize)(r.nextDouble()).toArray, nPoints = 200, - seed, eps = 0.1, sparsity = 0.7), 2).map(_.asML)) + seed, eps = 0.1, sparsity = 0.7), 2).map(_.asML).toDF() /* R code: @@ -74,13 +74,12 @@ class LinearRegressionSuite w <- c(1, 2, 3, 4) df <- as.data.frame(cbind(A, b)) */ - datasetWithWeight = spark.createDataFrame( - sc.parallelize(Seq( - Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2)) + datasetWithWeight = sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2).toDF() /* R code: @@ -90,20 +89,18 @@ class LinearRegressionSuite w <- c(1, 2, 3, 4) df.const.label <- as.data.frame(cbind(A, b.const)) */ - datasetWithWeightConstantLabel = spark.createDataFrame( - sc.parallelize(Seq( - Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(17.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(17.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2)) - datasetWithWeightZeroLabel = spark.createDataFrame( - sc.parallelize(Seq( - Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(0.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2)) + datasetWithWeightConstantLabel = sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(17.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(17.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(17.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2).toDF() + datasetWithWeightZeroLabel = sc.parallelize(Seq( + Instance(0.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(0.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(0.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(0.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2).toDF() } /** @@ -839,8 +836,7 @@ class LinearRegressionSuite } val data2 = weightedSignedData ++ weightedNoiseData - (spark.createDataFrame(sc.parallelize(data1, 4)), - spark.createDataFrame(sc.parallelize(data2, 4))) + (sc.parallelize(data1, 4).toDF(), sc.parallelize(data2, 4).toDF()) } val trainer1a = (new LinearRegression).setFitIntercept(true) http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala index 5c50a88..4109a29 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala @@ -32,13 +32,15 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext */ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { + import testImplicits._ + test("runWithValidation stops early and performs better on a validation dataset") { // Set numIterations large enough so that it stops early. val numIterations = 20 val trainRdd = sc.parallelize(OldGBTSuite.trainData, 2).map(_.asML) val validateRdd = sc.parallelize(OldGBTSuite.validateData, 2).map(_.asML) - val trainDF = spark.createDataFrame(trainRdd) - val validateDF = spark.createDataFrame(validateRdd) + val trainDF = trainRdd.toDF() + val validateDF = validateRdd.toDF() val algos = Array(Regression, Regression, Classification) val losses = Array(SquaredError, AbsoluteError, LogLoss) http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index 750dc5b..7116265 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -35,12 +35,13 @@ import org.apache.spark.sql.types.StructType class CrossValidatorSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + import testImplicits._ + @transient var dataset: Dataset[_] = _ override def beforeAll(): Unit = { super.beforeAll() - dataset = spark.createDataFrame( - sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2)) + dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() } test("cross validation with logistic regression") { @@ -67,9 +68,10 @@ class CrossValidatorSuite } test("cross validation with linear regression") { - val dataset = spark.createDataFrame( - sc.parallelize(LinearDataGenerator.generateLinearInput( - 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2).map(_.asML)) + val dataset = sc.parallelize( + LinearDataGenerator.generateLinearInput( + 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2) + .map(_.asML).toDF() val trainer = new LinearRegression().setSolver("l-bfgs") val lrParamMaps = new ParamGridBuilder() http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 9971371..87100ae 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -33,9 +33,11 @@ import org.apache.spark.sql.types.StructType class TrainValidationSplitSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + + import testImplicits._ + test("train validation with logistic regression") { - val dataset = spark.createDataFrame( - sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2)) + val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF() val lr = new LogisticRegression val lrParamMaps = new ParamGridBuilder() @@ -58,9 +60,10 @@ class TrainValidationSplitSuite } test("train validation with linear regression") { - val dataset = spark.createDataFrame( - sc.parallelize(LinearDataGenerator.generateLinearInput( - 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2).map(_.asML)) + val dataset = sc.parallelize( + LinearDataGenerator.generateLinearInput( + 6.3, Array(4.7, 7.2), Array(0.9, -1.3), Array(0.7, 1.2), 100, 42, 0.1), 2) + .map(_.asML).toDF() val trainer = new LinearRegression().setSolver("l-bfgs") val lrParamMaps = new ParamGridBuilder() http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 6aa93c9..e4e9be3 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -37,6 +37,8 @@ import org.apache.spark.util.Utils class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { + import testImplicits._ + test("epsilon computation") { assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.") assert(1.0 + EPSILON / 2.0 === 1.0, s"EPSILON is too big: $EPSILON.") @@ -255,9 +257,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val z = Vectors.dense(4.0) val p = (5.0, z) val w = Vectors.dense(6.0).asML - val df = spark.createDataFrame(Seq( - (0, x, y, p, w) - )).toDF("id", "x", "y", "p", "w") + val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w") .withColumn("x", col("x"), metadata) val newDF1 = convertVectorColumnsToML(df) assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.") @@ -282,9 +282,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val z = Vectors.dense(4.0).asML val p = (5.0, z) val w = Vectors.dense(6.0) - val df = spark.createDataFrame(Seq( - (0, x, y, p, w) - )).toDF("id", "x", "y", "p", "w") + val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w") .withColumn("x", col("x"), metadata) val newDF1 = convertVectorColumnsFromML(df) assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.") @@ -309,9 +307,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val z = Matrices.ones(1, 1) val p = (5.0, z) val w = Matrices.dense(1, 1, Array(4.5)).asML - val df = spark.createDataFrame(Seq( - (0, x, y, p, w) - )).toDF("id", "x", "y", "p", "w") + val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w") .withColumn("x", col("x"), metadata) val newDF1 = convertMatrixColumnsToML(df) assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.") @@ -336,9 +332,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { val z = Matrices.ones(1, 1).asML val p = (5.0, z) val w = Matrices.dense(1, 1, Array(4.5)) - val df = spark.createDataFrame(Seq( - (0, x, y, p, w) - )).toDF("id", "x", "y", "p", "w") + val df = Seq((0, x, y, p, w)).toDF("id", "x", "y", "p", "w") .withColumn("x", col("x"), metadata) val newDF1 = convertMatrixColumnsFromML(df) assert(newDF1.schema("x").metadata === metadata, "Metadata should be preserved.") http://git-wip-us.apache.org/repos/asf/spark/blob/f234b7cd/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala index db56aff..6bb7ed9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLlibTestSparkContext.scala @@ -23,7 +23,7 @@ import org.scalatest.Suite import org.apache.spark.SparkContext import org.apache.spark.ml.util.TempDirectory -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{SparkSession, SQLContext, SQLImplicits} import org.apache.spark.util.Utils trait MLlibTestSparkContext extends TempDirectory { self: Suite => @@ -55,4 +55,15 @@ trait MLlibTestSparkContext extends TempDirectory { self: Suite => super.afterAll() } } + + /** + * A helper object for importing SQL implicits. + * + * Note that the alternative of importing `spark.implicits._` is not possible here. + * This is because we create the [[SQLContext]] immediately before the first test is run, + * but the implicits import is needed in the constructor. + */ + protected object testImplicits extends SQLImplicits { + protected override def _sqlContext: SQLContext = self.spark.sqlContext + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
