Repository: spark
Updated Branches:
  refs/heads/master 76ecd0950 -> 0d63eb888


[SPARK-23975][ML] Add support of array input for all clustering methods

## What changes were proposed in this pull request?

Add support for all of the clustering methods

## How was this patch tested?

unit tests added

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Lu WANG <[email protected]>

Closes #21195 from ludatabricks/SPARK-23975-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d63eb88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d63eb88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d63eb88

Branch: refs/heads/master
Commit: 0d63eb8888d17df747fb41d7ba254718bb7af3ae
Parents: 76ecd09
Author: Lu WANG <[email protected]>
Authored: Mon May 7 20:08:41 2018 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Mon May 7 20:08:41 2018 -0700

----------------------------------------------------------------------
 .../spark/ml/clustering/BisectingKMeans.scala   | 21 ++++-----
 .../spark/ml/clustering/GaussianMixture.scala   | 12 +++--
 .../org/apache/spark/ml/clustering/KMeans.scala | 31 +++----------
 .../org/apache/spark/ml/clustering/LDA.scala    |  9 ++--
 .../org/apache/spark/ml/util/DatasetUtils.scala | 13 +++++-
 .../org/apache/spark/ml/util/SchemaUtils.scala  | 16 ++++++-
 .../ml/clustering/BisectingKMeansSuite.scala    | 21 ++++++++-
 .../ml/clustering/GaussianMixtureSuite.scala    | 21 ++++++++-
 .../spark/ml/clustering/KMeansSuite.scala       | 48 ++++++--------------
 .../apache/spark/ml/clustering/LDASuite.scala   | 20 +++++++-
 .../apache/spark/ml/util/MLTestingUtils.scala   | 23 +++++++++-
 11 files changed, 147 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index addc12ac..438e53b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -22,17 +22,15 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model}
-import org.apache.spark.ml.linalg.{Vector, VectorUDT}
+import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
 import org.apache.spark.mllib.clustering.{BisectingKMeans => 
MLlibBisectingKMeans,
   BisectingKMeansModel => MLlibBisectingKMeansModel}
-import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => 
OldVectors}
 import org.apache.spark.mllib.linalg.VectorImplicits._
-import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
-import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
 
@@ -75,7 +73,7 @@ private[clustering] trait BisectingKMeansParams extends 
Params with HasMaxIter
    * @return output schema
    */
   protected def validateAndTransformSchema(schema: StructType): StructType = {
-    SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
+    SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
     SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
   }
 }
@@ -113,7 +111,8 @@ class BisectingKMeansModel private[ml] (
   override def transform(dataset: Dataset[_]): DataFrame = {
     transformSchema(dataset.schema, logging = true)
     val predictUDF = udf((vector: Vector) => predict(vector))
-    dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
+    dataset.withColumn($(predictionCol),
+      predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
   }
 
   @Since("2.0.0")
@@ -132,9 +131,9 @@ class BisectingKMeansModel private[ml] (
    */
   @Since("2.0.0")
   def computeCost(dataset: Dataset[_]): Double = {
-    SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
-    val data = dataset.select(col($(featuresCol))).rdd.map { case Row(point: 
Vector) => point }
-    parentModel.computeCost(data.map(OldVectors.fromML))
+    SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol)
+    val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
+    parentModel.computeCost(data)
   }
 
   @Since("2.0.0")
@@ -260,9 +259,7 @@ class BisectingKMeans @Since("2.0.0") (
   @Since("2.0.0")
   override def fit(dataset: Dataset[_]): BisectingKMeansModel = {
     transformSchema(dataset.schema, logging = true)
-    val rdd: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
-      case Row(point: Vector) => OldVectors.fromML(point)
-    }
+    val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
 
     val instr = Instrumentation.create(this, rdd)
     instr.logParams(featuresCol, predictionCol, k, maxIter, seed, 
minDivisibleClusterSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
index b580490..88d618c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala
@@ -33,7 +33,7 @@ import org.apache.spark.mllib.linalg.{Matrices => 
OldMatrices, Matrix => OldMatr
   Vector => OldVector, Vectors => OldVectors}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{IntegerType, StructType}
 
 
@@ -63,7 +63,7 @@ private[clustering] trait GaussianMixtureParams extends 
Params with HasMaxIter w
    * @return output schema
    */
   protected def validateAndTransformSchema(schema: StructType): StructType = {
-    SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
+    SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
     val schemaWithPredictionCol = SchemaUtils.appendColumn(schema, 
$(predictionCol), IntegerType)
     SchemaUtils.appendColumn(schemaWithPredictionCol, $(probabilityCol), new 
VectorUDT)
   }
@@ -109,8 +109,9 @@ class GaussianMixtureModel private[ml] (
     transformSchema(dataset.schema, logging = true)
     val predUDF = udf((vector: Vector) => predict(vector))
     val probUDF = udf((vector: Vector) => predictProbability(vector))
-    dataset.withColumn($(predictionCol), predUDF(col($(featuresCol))))
-      .withColumn($(probabilityCol), probUDF(col($(featuresCol))))
+    dataset
+      .withColumn($(predictionCol), 
predUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
+      .withColumn($(probabilityCol), 
probUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
   }
 
   @Since("2.0.0")
@@ -340,7 +341,8 @@ class GaussianMixture @Since("2.0.0") (
     val sc = dataset.sparkSession.sparkContext
     val numClusters = $(k)
 
-    val instances: RDD[Vector] = dataset.select(col($(featuresCol))).rdd.map {
+    val instances: RDD[Vector] = dataset
+      .select(DatasetUtils.columnToVector(dataset, getFeaturesCol)).rdd.map {
       case Row(features: Vector) => features
     }.cache()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index de61c9c..97f246f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkException
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model, PipelineStage}
-import org.apache.spark.ml.linalg.{Vector, VectorUDT}
+import org.apache.spark.ml.linalg.Vector
 import org.apache.spark.ml.param._
 import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util._
@@ -34,7 +34,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions.udf
-import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, 
IntegerType, StructType}
+import org.apache.spark.sql.types.{IntegerType, StructType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.VersionUtils.majorVersion
 
@@ -87,23 +87,12 @@ private[clustering] trait KMeansParams extends Params with 
HasMaxIter with HasFe
   def getInitSteps: Int = $(initSteps)
 
   /**
-   * Validates the input schema.
-   * @param schema input schema
-   */
-  private[clustering] def validateSchema(schema: StructType): Unit = {
-    val typeCandidates = List( new VectorUDT,
-      new ArrayType(DoubleType, false),
-      new ArrayType(FloatType, false))
-
-    SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates)
-  }
-  /**
    * Validates and transforms the input schema.
    * @param schema input schema
    * @return output schema
    */
   protected def validateAndTransformSchema(schema: StructType): StructType = {
-    validateSchema(schema)
+    SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
     SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
   }
 }
@@ -160,12 +149,8 @@ class KMeansModel private[ml] (
   // TODO: Replace the temp fix when we have proper evaluators defined for 
clustering.
   @Since("2.0.0")
   def computeCost(dataset: Dataset[_]): Double = {
-    validateSchema(dataset.schema)
-
-    val data: RDD[OldVector] = 
dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol))
-      .rdd.map {
-      case Row(point: Vector) => OldVectors.fromML(point)
-    }
+    SchemaUtils.validateVectorCompatibleColumn(dataset.schema, getFeaturesCol)
+    val data = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
     parentModel.computeCost(data)
   }
 
@@ -351,11 +336,7 @@ class KMeans @Since("1.5.0") (
     transformSchema(dataset.schema, logging = true)
 
     val handlePersistence = dataset.storageLevel == StorageLevel.NONE
-    val instances: RDD[OldVector] = dataset.select(
-      DatasetUtils.columnToVector(dataset, getFeaturesCol))
-      .rdd.map {
-      case Row(point: Vector) => OldVectors.fromML(point)
-    }
+    val instances = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)
 
     if (handlePersistence) {
       instances.persist(StorageLevel.MEMORY_AND_DISK)

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
index 4707723..afe599c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
@@ -43,7 +43,7 @@ import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, 
StructType}
 import org.apache.spark.util.PeriodicCheckpointer
 import org.apache.spark.util.VersionUtils
 
@@ -345,7 +345,7 @@ private[clustering] trait LDAParams extends Params with 
HasFeaturesCol with HasM
             s" must be >= 1.  Found value: $getTopicConcentration")
       }
     }
-    SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
+    SchemaUtils.validateVectorCompatibleColumn(schema, getFeaturesCol)
     SchemaUtils.appendColumn(schema, $(topicDistributionCol), new VectorUDT)
   }
 
@@ -461,7 +461,8 @@ abstract class LDAModel private[ml] (
       val transformer = oldLocalModel.getTopicDistributionMethod
 
       val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML }
-      dataset.withColumn($(topicDistributionCol), 
t(col($(featuresCol)))).toDF()
+      dataset.withColumn($(topicDistributionCol),
+        t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF()
     } else {
       logWarning("LDAModel.transform was called without any output columns. 
Set an output column" +
         " such as topicDistributionCol to produce results.")
@@ -938,7 +939,7 @@ object LDA extends MLReadable[LDA] {
        featuresCol: String): RDD[(Long, OldVector)] = {
     dataset
       .withColumn("docId", monotonically_increasing_id())
-      .select("docId", featuresCol)
+      .select(col("docId"), DatasetUtils.columnToVector(dataset, featuresCol))
       .rdd
       .map { case Row(docId: Long, features: Vector) =>
         (docId, OldVectors.fromML(features))

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala
index 52619cb..6af4b3e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.ml.util
 
-import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
-import org.apache.spark.sql.{Column, Dataset}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
+import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => 
OldVectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Column, Dataset, Row}
 import org.apache.spark.sql.functions.{col, udf}
 import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}
 
@@ -60,4 +62,11 @@ private[spark] object DatasetUtils {
         throw new IllegalArgumentException(s"$other column cannot be cast to 
Vector")
     }
   }
+
+  def columnToOldVector(dataset: Dataset[_], colName: String): RDD[OldVector] 
= {
+    dataset.select(columnToVector(dataset, colName))
+      .rdd.map {
+      case Row(point: Vector) => OldVectors.fromML(point)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
index 334410c..d9a3f85 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.ml.util
 
-import org.apache.spark.sql.types.{DataType, NumericType, StructField, 
StructType}
+import org.apache.spark.ml.linalg.VectorUDT
+import org.apache.spark.sql.types._
 
 
 /**
@@ -101,4 +102,17 @@ private[spark] object SchemaUtils {
     require(!schema.fieldNames.contains(col.name), s"Column ${col.name} 
already exists.")
     StructType(schema.fields :+ col)
   }
+
+  /**
+   * Check whether the given column in the schema is one of the supporting 
vector type: Vector,
+   * Array[Float]. Array[Double]
+   * @param schema input schema
+   * @param colName column name
+   */
+  def validateVectorCompatibleColumn(schema: StructType, colName: String): 
Unit = {
+    val typeCandidates = List( new VectorUDT,
+      new ArrayType(DoubleType, false),
+      new ArrayType(FloatType, false))
+    checkColumnTypes(schema, colName, typeCandidates)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index 02880f9..f3ff2af 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -17,13 +17,16 @@
 
 package org.apache.spark.ml.clustering
 
+import scala.language.existentials
+
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.clustering.DistanceMeasure
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{DataFrame, Dataset}
 
 class BisectingKMeansSuite
   extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
@@ -182,6 +185,22 @@ class BisectingKMeansSuite
 
     model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0)
   }
+
+  test("BisectingKMeans with Array input") {
+    def trainAndComputeCost(dataset: Dataset[_]): Double = {
+      val model = new 
BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
+      model.computeCost(dataset)
+    }
+
+    val (newDataset, newDatasetD, newDatasetF) = 
MLTestingUtils.generateArrayFeatureDataset(dataset)
+    val trueCost = trainAndComputeCost(newDataset)
+    val doubleArrayCost = trainAndComputeCost(newDatasetD)
+    val floatArrayCost = trainAndComputeCost(newDatasetF)
+
+    // checking the cost is fine enough as a sanity check
+    assert(trueCost ~== doubleArrayCost absTol 1e-6)
+    assert(trueCost ~== floatArrayCost absTol 1e-6)
+  }
 }
 
 object BisectingKMeansSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
index 08b800b..d0d461a 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ml.clustering
 
+import scala.language.existentials
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
@@ -24,8 +26,7 @@ import 
org.apache.spark.ml.stat.distribution.MultivariateGaussian
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{Dataset, Row}
-
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 
 class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
   with DefaultReadWriteTest {
@@ -256,6 +257,22 @@ class GaussianMixtureSuite extends SparkFunSuite with 
MLlibTestSparkContext
     val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, 
triangularValues)
     assert(symmetricMatrix === expectedMatrix)
   }
+
+  test("GaussianMixture with Array input") {
+    def trainAndComputlogLikelihood(dataset: Dataset[_]): Double = {
+      val model = new 
GaussianMixture().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
+      model.summary.logLikelihood
+    }
+
+    val (newDataset, newDatasetD, newDatasetF) = 
MLTestingUtils.generateArrayFeatureDataset(dataset)
+    val trueLikelihood = trainAndComputlogLikelihood(newDataset)
+    val doubleLikelihood = trainAndComputlogLikelihood(newDatasetD)
+    val floatLikelihood = trainAndComputlogLikelihood(newDatasetF)
+
+    // checking the cost is fine enough as a sanity check
+    assert(trueLikelihood ~== doubleLikelihood absTol 1e-6)
+    assert(trueLikelihood ~== floatLikelihood absTol 1e-6)
+  }
 }
 
 object GaussianMixtureSuite extends SparkFunSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
index 5445ebe..680a7c2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ml.clustering
 
+import scala.language.existentials
 import scala.util.Random
 
 import org.dmg.pmml.{ClusteringModel, PMML}
@@ -25,13 +26,11 @@ import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.util._
-import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => 
MLlibKMeans,
-  KMeansModel => MLlibKMeansModel}
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => 
MLlibKMeans, KMeansModel => MLlibKMeansModel}
 import org.apache.spark.mllib.linalg.{Vectors => MLlibVectors}
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, 
IntegerType, StructType}
 
 private[clustering] case class TestRow(features: Vector)
 
@@ -202,38 +201,19 @@ class KMeansSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultR
   }
 
   test("KMean with Array input") {
-    val featuresColNameD = "array_double_features"
-    val featuresColNameF = "array_float_features"
-
-    val doubleUDF = udf { (features: Vector) =>
-      val featureArray = Array.fill[Double](features.size)(0.0)
-      features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
-      featureArray
-    }
-    val floatUDF = udf { (features: Vector) =>
-      val featureArray = Array.fill[Float](features.size)(0.0f)
-      features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
-      featureArray
+    def trainAndComputeCost(dataset: Dataset[_]): Double = {
+      val model = new KMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
+      model.computeCost(dataset)
     }
 
-    val newdatasetD = dataset.withColumn(featuresColNameD, 
doubleUDF(col("features")))
-      .drop("features")
-    val newdatasetF = dataset.withColumn(featuresColNameF, 
floatUDF(col("features")))
-      .drop("features")
-    assert(newdatasetD.schema(featuresColNameD).dataType.equals(new 
ArrayType(DoubleType, false)))
-    assert(newdatasetF.schema(featuresColNameF).dataType.equals(new 
ArrayType(FloatType, false)))
-
-    val kmeansD = new 
KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1)
-    val kmeansF = new 
KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1)
-    val modelD = kmeansD.fit(newdatasetD)
-    val modelF = kmeansF.fit(newdatasetF)
-    val transformedD = modelD.transform(newdatasetD)
-    val transformedF = modelF.transform(newdatasetF)
-
-    val predictDifference = transformedD.select("prediction")
-      .except(transformedF.select("prediction"))
-    assert(predictDifference.count() == 0)
-    assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) )
+    val (newDataset, newDatasetD, newDatasetF) = 
MLTestingUtils.generateArrayFeatureDataset(dataset)
+    val trueCost = trainAndComputeCost(newDataset)
+    val doubleArrayCost = trainAndComputeCost(newDatasetD)
+    val floatArrayCost = trainAndComputeCost(newDatasetF)
+
+    // checking the cost is fine enough as a sanity check
+    assert(trueCost ~== doubleArrayCost absTol 1e-6)
+    assert(trueCost ~== floatArrayCost absTol 1e-6)
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
index e73bbc1..8d728f0 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.ml.clustering
 
+import scala.language.existentials
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkFunSuite
@@ -26,7 +28,6 @@ import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql._
 
-
 object LDASuite {
   def generateLDAData(
       spark: SparkSession,
@@ -323,4 +324,21 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
       assert(model.getOptimizer === optimizer)
     }
   }
+
+  test("LDA with Array input") {
+    def trainAndLogLikelihoodAndPerplexity(dataset: Dataset[_]): (Double, 
Double) = {
+      val model = new 
LDA().setK(k).setOptimizer("online").setMaxIter(1).setSeed(1).fit(dataset)
+      (model.logLikelihood(dataset), model.logPerplexity(dataset))
+    }
+
+    val (newDataset, newDatasetD, newDatasetF) = 
MLTestingUtils.generateArrayFeatureDataset(dataset)
+    val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset)
+    val (llD, lpD) = trainAndLogLikelihoodAndPerplexity(newDatasetD)
+    val (llF, lpF) = trainAndLogLikelihoodAndPerplexity(newDatasetF)
+    // TODO: need to compare the results once we fix the seed issue for LDA 
(SPARK-22210)
+    assert(llD <= 0.0 && llD != Double.NegativeInfinity)
+    assert(llF <= 0.0 && llF != Double.NegativeInfinity)
+    assert(lpD >= 0.0 && lpD != Double.NegativeInfinity)
+    assert(lpF >= 0.0 && lpF != Double.NegativeInfinity)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0d63eb88/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala 
b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
index c328d81..5e72b4d 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
@@ -21,7 +21,7 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml._
 import org.apache.spark.ml.evaluation.Evaluator
 import org.apache.spark.ml.feature.{Instance, LabeledPoint}
-import org.apache.spark.ml.linalg.{Vector, Vectors}
+import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, 
HasWeightCol}
 import org.apache.spark.ml.recommendation.{ALS, ALSModel}
@@ -247,4 +247,25 @@ object MLTestingUtils extends SparkFunSuite {
     }
     models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)}
   }
+
+  /**
+   * Helper function for testing different input types for "features" column. 
Given a DataFrame,
+   * generate three output DataFrames: one having vector "features" column 
with float precision,
+   * one having double array "features" column with float precision, and one 
having float array
+   * "features" column.
+   */
+  def generateArrayFeatureDataset(dataset: Dataset[_],
+    featuresColName: String = "features"): (Dataset[_], Dataset[_], 
Dataset[_]) = {
+    val toFloatVectorUDF = udf { (features: Vector) =>
+      Vectors.dense(features.toArray.map(_.toFloat.toDouble))}
+    val toDoubleArrayUDF = udf { (features: Vector) => features.toArray}
+    val toFloatArrayUDF = udf { (features: Vector) => 
features.toArray.map(_.toFloat)}
+    val newDataset = dataset.withColumn(featuresColName, 
toFloatVectorUDF(col(featuresColName)))
+    val newDatasetD = newDataset.withColumn(featuresColName, 
toDoubleArrayUDF(col(featuresColName)))
+    val newDatasetF = newDataset.withColumn(featuresColName, 
toFloatArrayUDF(col(featuresColName)))
+    assert(newDataset.schema(featuresColName).dataType.equals(new VectorUDT))
+    assert(newDatasetD.schema(featuresColName).dataType.equals(new 
ArrayType(DoubleType, false)))
+    assert(newDatasetF.schema(featuresColName).dataType.equals(new 
ArrayType(FloatType, false)))
+    (newDataset, newDatasetD, newDatasetF)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to