Repository: spark
Updated Branches:
  refs/heads/master 05d051293 -> 4e0fb010c


[SPARK-23217][ML] Add cosine distance measure to ClusteringEvaluator

## What changes were proposed in this pull request?

The PR provided an implementation of ClusteringEvaluator using the cosine 
distance measure.
This allows to evaluate clustering results created using the cosine distance, 
introduced in SPARK-22119.

In the corresponding JIRA, there is a design document for the algorithm 
implemented here.

## How was this patch tested?

Added UT which compares the result to the one provided by python sklearn.

Author: Marco Gaido <marcogaid...@gmail.com>

Closes #20396 from mgaido91/SPARK-23217.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e0fb010
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e0fb010
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e0fb010

Branch: refs/heads/master
Commit: 4e0fb010ccdf13fe411f2a4796bbadc385b01520
Parents: 05d0512
Author: Marco Gaido <marcogaid...@gmail.com>
Authored: Tue Feb 13 11:51:19 2018 -0600
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Feb 13 11:51:19 2018 -0600

----------------------------------------------------------------------
 .../ml/evaluation/ClusteringEvaluator.scala     | 334 +++++++++++++++----
 .../evaluation/ClusteringEvaluatorSuite.scala   |  32 +-
 2 files changed, 300 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e0fb010/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
index d6ec522..8d4ae56 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
@@ -20,11 +20,12 @@ package org.apache.spark.ml.evaluation
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, 
VectorUDT}
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, SparseVector, Vector, 
Vectors, VectorUDT}
 import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
-import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable, SchemaUtils}
-import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable,
+  SchemaUtils}
+import org.apache.spark.sql.{Column, DataFrame, Dataset}
 import org.apache.spark.sql.functions.{avg, col, udf}
 import org.apache.spark.sql.types.DoubleType
 
@@ -32,15 +33,11 @@ import org.apache.spark.sql.types.DoubleType
  * :: Experimental ::
  *
  * Evaluator for clustering results.
- * The metric computes the Silhouette measure
- * using the squared Euclidean distance.
- *
- * The Silhouette is a measure for the validation
- * of the consistency within clusters. It ranges
- * between 1 and -1, where a value close to 1
- * means that the points in a cluster are close
- * to the other points in the same cluster and
- * far from the points of the other clusters.
+ * The metric computes the Silhouette measure using the specified distance 
measure.
+ *
+ * The Silhouette is a measure for the validation of the consistency within 
clusters. It ranges
+ * between 1 and -1, where a value close to 1 means that the points in a 
cluster are close to the
+ * other points in the same cluster and far from the points of the other 
clusters.
  */
 @Experimental
 @Since("2.3.0")
@@ -84,18 +81,40 @@ class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") 
override val uid: Str
   @Since("2.3.0")
   def setMetricName(value: String): this.type = set(metricName, value)
 
-  setDefault(metricName -> "silhouette")
+  /**
+   * param for distance measure to be used in evaluation
+   * (supports `"squaredEuclidean"` (default), `"cosine"`)
+   * @group param
+   */
+  @Since("2.4.0")
+  val distanceMeasure: Param[String] = {
+    val availableValues = Array("squaredEuclidean", "cosine")
+    val allowedParams = ParamValidators.inArray(availableValues)
+    new Param(this, "distanceMeasure", "distance measure in evaluation. 
Supported options: " +
+      availableValues.mkString("'", "', '", "'"), allowedParams)
+  }
+
+  /** @group getParam */
+  @Since("2.4.0")
+  def getDistanceMeasure: String = $(distanceMeasure)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setDistanceMeasure(value: String): this.type = set(distanceMeasure, 
value)
+
+  setDefault(metricName -> "silhouette", distanceMeasure -> "squaredEuclidean")
 
   @Since("2.3.0")
   override def evaluate(dataset: Dataset[_]): Double = {
     SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
     SchemaUtils.checkNumericType(dataset.schema, $(predictionCol))
 
-    $(metricName) match {
-      case "silhouette" =>
+    ($(metricName), $(distanceMeasure)) match {
+      case ("silhouette", "squaredEuclidean") =>
         SquaredEuclideanSilhouette.computeSilhouetteScore(
-          dataset, $(predictionCol), $(featuresCol)
-      )
+          dataset, $(predictionCol), $(featuresCol))
+      case ("silhouette", "cosine") =>
+        CosineSilhouette.computeSilhouetteScore(dataset, $(predictionCol), 
$(featuresCol))
     }
   }
 }
@@ -111,6 +130,48 @@ object ClusteringEvaluator
 }
 
 
+private[evaluation] abstract class Silhouette {
+
+  /**
+   * It computes the Silhouette coefficient for a point.
+   */
+  def pointSilhouetteCoefficient(
+      clusterIds: Set[Double],
+      pointClusterId: Double,
+      pointClusterNumOfPoints: Long,
+      averageDistanceToCluster: (Double) => Double): Double = {
+    // Here we compute the average dissimilarity of the current point to any 
cluster of which the
+    // point is not a member.
+    // The cluster with the lowest average dissimilarity - i.e. the nearest 
cluster to the current
+    // point - is said to be the "neighboring cluster".
+    val otherClusterIds = clusterIds.filter(_ != pointClusterId)
+    val neighboringClusterDissimilarity = 
otherClusterIds.map(averageDistanceToCluster).min
+
+    // adjustment for excluding the node itself from the computation of the 
average dissimilarity
+    val currentClusterDissimilarity = if (pointClusterNumOfPoints == 1) {
+      0.0
+    } else {
+      averageDistanceToCluster(pointClusterId) * pointClusterNumOfPoints /
+        (pointClusterNumOfPoints - 1)
+    }
+
+    if (currentClusterDissimilarity < neighboringClusterDissimilarity) {
+      1 - (currentClusterDissimilarity / neighboringClusterDissimilarity)
+    } else if (currentClusterDissimilarity > neighboringClusterDissimilarity) {
+      (neighboringClusterDissimilarity / currentClusterDissimilarity) - 1
+    } else {
+      0.0
+    }
+  }
+
+  /**
+   * Compute the mean Silhouette values of all samples.
+   */
+  def overallScore(df: DataFrame, scoreColumn: Column): Double = {
+    df.select(avg(scoreColumn)).collect()(0).getDouble(0)
+  }
+}
+
 /**
  * SquaredEuclideanSilhouette computes the average of the
  * Silhouette over all the data of the dataset, which is
@@ -259,7 +320,7 @@ object ClusteringEvaluator
  * `N` is the number of points in the dataset and `W` is the number
  * of worker nodes.
  */
-private[evaluation] object SquaredEuclideanSilhouette {
+private[evaluation] object SquaredEuclideanSilhouette extends Silhouette {
 
   private[this] var kryoRegistrationPerformed: Boolean = false
 
@@ -336,18 +397,19 @@ private[evaluation] object SquaredEuclideanSilhouette {
    * It computes the Silhouette coefficient for a point.
    *
    * @param broadcastedClustersMap A map of the precomputed values for each 
cluster.
-   * @param features The [[org.apache.spark.ml.linalg.Vector]] representing 
the current point.
+   * @param point The [[org.apache.spark.ml.linalg.Vector]] representing the 
current point.
    * @param clusterId The id of the cluster the current point belongs to.
    * @param squaredNorm The `$\Xi_{X}$` (which is the squared norm) 
precomputed for the point.
    * @return The Silhouette for the point.
    */
   def computeSilhouetteCoefficient(
      broadcastedClustersMap: Broadcast[Map[Double, ClusterStats]],
-     features: Vector,
+     point: Vector,
      clusterId: Double,
      squaredNorm: Double): Double = {
 
-    def compute(squaredNorm: Double, point: Vector, clusterStats: 
ClusterStats): Double = {
+    def compute(targetClusterId: Double): Double = {
+      val clusterStats = broadcastedClustersMap.value(targetClusterId)
       val pointDotClusterFeaturesSum = BLAS.dot(point, clusterStats.featureSum)
 
       squaredNorm +
@@ -355,41 +417,14 @@ private[evaluation] object SquaredEuclideanSilhouette {
         2 * pointDotClusterFeaturesSum / clusterStats.numOfPoints
     }
 
-    // Here we compute the average dissimilarity of the
-    // current point to any cluster of which the point
-    // is not a member.
-    // The cluster with the lowest average dissimilarity
-    // - i.e. the nearest cluster to the current point -
-    // is said to be the "neighboring cluster".
-    var neighboringClusterDissimilarity = Double.MaxValue
-    broadcastedClustersMap.value.keySet.foreach {
-      c =>
-        if (c != clusterId) {
-          val dissimilarity = compute(squaredNorm, features, 
broadcastedClustersMap.value(c))
-          if(dissimilarity < neighboringClusterDissimilarity) {
-            neighboringClusterDissimilarity = dissimilarity
-          }
-        }
-    }
-    val currentCluster = broadcastedClustersMap.value(clusterId)
-    // adjustment for excluding the node itself from
-    // the computation of the average dissimilarity
-    val currentClusterDissimilarity = if (currentCluster.numOfPoints == 1) {
-      0
-    } else {
-      compute(squaredNorm, features, currentCluster) * 
currentCluster.numOfPoints /
-        (currentCluster.numOfPoints - 1)
-    }
-
-    (currentClusterDissimilarity compare 
neighboringClusterDissimilarity).signum match {
-      case -1 => 1 - (currentClusterDissimilarity / 
neighboringClusterDissimilarity)
-      case 1 => (neighboringClusterDissimilarity / 
currentClusterDissimilarity) - 1
-      case 0 => 0.0
-    }
+    pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
+      clusterId,
+      broadcastedClustersMap.value(clusterId).numOfPoints,
+      compute)
   }
 
   /**
-   * Compute the mean Silhouette values of all samples.
+   * Compute the Silhouette score of the dataset using squared Euclidean 
distance measure.
    *
    * @param dataset The input dataset (previously clustered) on which compute 
the Silhouette.
    * @param predictionCol The name of the column which contains the predicted 
cluster id
@@ -412,7 +447,7 @@ private[evaluation] object SquaredEuclideanSilhouette {
     val clustersStatsMap = SquaredEuclideanSilhouette
       .computeClusterStats(dfWithSquaredNorm, predictionCol, featuresCol)
 
-    // Silhouette is reasonable only when the number of clusters is grater 
then 1
+    // Silhouette is reasonable only when the number of clusters is greater 
then 1
     assert(clustersStatsMap.size > 1, "Number of clusters must be greater than 
one.")
 
     val bClustersStatsMap = 
dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)
@@ -421,13 +456,190 @@ private[evaluation] object SquaredEuclideanSilhouette {
       computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double, _: 
Double)
     }
 
-    val silhouetteScore = dfWithSquaredNorm
-      .select(avg(
-        computeSilhouetteCoefficientUDF(
-          col(featuresCol), col(predictionCol).cast(DoubleType), 
col("squaredNorm"))
-      ))
-      .collect()(0)
-      .getDouble(0)
+    val silhouetteScore = overallScore(dfWithSquaredNorm,
+      computeSilhouetteCoefficientUDF(col(featuresCol), 
col(predictionCol).cast(DoubleType),
+        col("squaredNorm")))
+
+    bClustersStatsMap.destroy()
+
+    silhouetteScore
+  }
+}
+
+
+/**
+ * The algorithm which is implemented in this object, instead, is an efficient 
and parallel
+ * implementation of the Silhouette using the cosine distance measure. The 
cosine distance
+ * measure is defined as `1 - s` where `s` is the cosine similarity between 
two points.
+ *
+ * The total distance of the point `X` to the points `$C_{i}$` belonging to 
the cluster `$\Gamma$`
+ * is:
+ *
+ * <blockquote>
+ *   $$
+ *   \sum\limits_{i=1}^N d(X, C_{i} ) =
+ *   \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ 
\|X\|\|C_{i}\|} \Big)
+ *   = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D 
\frac{x_{j}}{\|X\|}
+ *   \frac{c_{ij}}{\|C_{i}\|}
+ *   = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N
+ *   \frac{c_{ij}}{\|C_{i}\|} \Big)
+ *   $$
+ * </blockquote>
+ *
+ * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` is 
the `j`-th dimension
+ * of the `i`-th point in cluster `$\Gamma$`.
+ *
+ * Then, we can define the vector:
+ *
+ * <blockquote>
+ *   $$
+ *   \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
+ *   $$
+ * </blockquote>
+ *
+ * which can be precomputed for each point and the vector
+ *
+ * <blockquote>
+ *   $$
+ *   \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i 
= 1, ..., D
+ *   $$
+ * </blockquote>
+ *
+ * which can be precomputed too for each cluster `$\Gamma$` by its points 
`$C_{i}$`.
+ *
+ * With these definitions, the numerator becomes:
+ *
+ * <blockquote>
+ *   $$
+ *   N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
+ *   $$
+ * </blockquote>
+ *
+ * Thus the average distance of a point `X` to the points of the cluster 
`$\Gamma$` is:
+ *
+ * <blockquote>
+ *   $$
+ *   1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
+ *   $$
+ * </blockquote>
+ *
+ * In the implementation, the precomputed values for the clusters are 
distributed among the worker
+ * nodes via broadcasted variables, because we can assume that the clusters 
are limited in number.
+ *
+ * The main strengths of this algorithm are the low computational complexity 
and the intrinsic
+ * parallelism. The precomputed information for each point and for each 
cluster can be computed
+ * with a computational complexity which is `O(N/W)`, where `N` is the number 
of points in the
+ * dataset and `W` is the number of worker nodes. After that, every point can 
be analyzed
+ * independently from the others.
+ *
+ * For every point we need to compute the average distance to all the 
clusters. Since the formula
+ * above requires `O(D)` operations, this phase has a computational complexity 
which is
+ * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite 
low), `D` is the number
+ * of dimensions, `N` is the number of points in the dataset and `W` is the 
number of worker
+ * nodes.
+ */
+private[evaluation] object CosineSilhouette extends Silhouette {
+
+  private[this] val normalizedFeaturesColName = "normalizedFeatures"
+
+  /**
+   * The method takes the input dataset and computes the aggregated values
+   * about a cluster which are needed by the algorithm.
+   *
+   * @param df The DataFrame which contains the input data
+   * @param predictionCol The name of the column which contains the predicted 
cluster id
+   *                      for the point.
+   * @return A [[scala.collection.immutable.Map]] which associates each 
cluster id to a
+   *         its statistics (ie. the precomputed values `N` and 
`$\Omega_{\Gamma}$`).
+   */
+  def computeClusterStats(df: DataFrame, predictionCol: String): Map[Double, 
(Vector, Long)] = {
+    val numFeatures = 
df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
+    val clustersStatsRDD = df.select(
+      col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
+      .rdd
+      .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
+      .aggregateByKey[(DenseVector, 
Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
+      seqOp = {
+        case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), 
(normalizedFeatures)) =>
+          BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
+          (normalizedFeaturesSum, numOfPoints + 1)
+      },
+      combOp = {
+        case ((normalizedFeaturesSum1, numOfPoints1), (normalizedFeaturesSum2, 
numOfPoints2)) =>
+          BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
+          (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
+      }
+    )
+
+    clustersStatsRDD
+      .collectAsMap()
+      .toMap
+  }
+
+  /**
+   * It computes the Silhouette coefficient for a point.
+   *
+   * @param broadcastedClustersMap A map of the precomputed values for each 
cluster.
+   * @param normalizedFeatures The [[org.apache.spark.ml.linalg.Vector]] 
representing the
+   *                           normalized features of the current point.
+   * @param clusterId The id of the cluster the current point belongs to.
+   */
+  def computeSilhouetteCoefficient(
+      broadcastedClustersMap: Broadcast[Map[Double, (Vector, Long)]],
+      normalizedFeatures: Vector,
+      clusterId: Double): Double = {
+
+    def compute(targetClusterId: Double): Double = {
+      val (normalizedFeatureSum, numOfPoints) = 
broadcastedClustersMap.value(targetClusterId)
+      1 - BLAS.dot(normalizedFeatures, normalizedFeatureSum) / numOfPoints
+    }
+
+    pointSilhouetteCoefficient(broadcastedClustersMap.value.keySet,
+      clusterId,
+      broadcastedClustersMap.value(clusterId)._2,
+      compute)
+  }
+
+  /**
+   * Compute the Silhouette score of the dataset using the cosine distance 
measure.
+   *
+   * @param dataset The input dataset (previously clustered) on which compute 
the Silhouette.
+   * @param predictionCol The name of the column which contains the predicted 
cluster id
+   *                      for the point.
+   * @param featuresCol The name of the column which contains the feature 
vector of the point.
+   * @return The average of the Silhouette values of the clustered data.
+   */
+  def computeSilhouetteScore(
+      dataset: Dataset[_],
+      predictionCol: String,
+      featuresCol: String): Double = {
+    val normalizeFeatureUDF = udf {
+      features: Vector => {
+        val norm = Vectors.norm(features, 2.0)
+        features match {
+          case d: DenseVector => Vectors.dense(d.values.map(_ / norm))
+          case s: SparseVector => Vectors.sparse(s.size, s.indices, 
s.values.map(_ / norm))
+        }
+      }
+    }
+    val dfWithNormalizedFeatures = 
dataset.withColumn(normalizedFeaturesColName,
+      normalizeFeatureUDF(col(featuresCol)))
+
+    // compute aggregate values for clusters needed by the algorithm
+    val clustersStatsMap = computeClusterStats(dfWithNormalizedFeatures, 
predictionCol)
+
+    // Silhouette is reasonable only when the number of clusters is greater 
then 1
+    assert(clustersStatsMap.size > 1, "Number of clusters must be greater than 
one.")
+
+    val bClustersStatsMap = 
dataset.sparkSession.sparkContext.broadcast(clustersStatsMap)
+
+    val computeSilhouetteCoefficientUDF = udf {
+      computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: Double)
+    }
+
+    val silhouetteScore = overallScore(dfWithNormalizedFeatures,
+      computeSilhouetteCoefficientUDF(col(normalizedFeaturesColName),
+        col(predictionCol).cast(DoubleType)))
 
     bClustersStatsMap.destroy()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e0fb010/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
index 677ce49..3bf3477 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/evaluation/ClusteringEvaluatorSuite.scala
@@ -66,16 +66,38 @@ class ClusteringEvaluatorSuite
     assert(evaluator.evaluate(irisDataset) ~== 0.6564679231 relTol 1e-5)
   }
 
-  test("number of clusters must be greater than one") {
-    val singleClusterDataset = irisDataset.where($"label" === 0.0)
+  /*
+    Use the following python code to load the data and evaluate it using 
scikit-learn package.
+
+    from sklearn import datasets
+    from sklearn.metrics import silhouette_score
+    iris = datasets.load_iris()
+    round(silhouette_score(iris.data, iris.target, metric='cosine'), 10)
+
+    0.7222369298
+  */
+  test("cosine Silhouette") {
     val evaluator = new ClusteringEvaluator()
       .setFeaturesCol("features")
       .setPredictionCol("label")
+      .setDistanceMeasure("cosine")
+
+    assert(evaluator.evaluate(irisDataset) ~== 0.7222369298 relTol 1e-5)
+  }
+
+  test("number of clusters must be greater than one") {
+    val singleClusterDataset = irisDataset.where($"label" === 0.0)
+    Seq("squaredEuclidean", "cosine").foreach { distanceMeasure =>
+      val evaluator = new ClusteringEvaluator()
+        .setFeaturesCol("features")
+        .setPredictionCol("label")
+        .setDistanceMeasure(distanceMeasure)
 
-    val e = intercept[AssertionError]{
-      evaluator.evaluate(singleClusterDataset)
+      val e = intercept[AssertionError] {
+        evaluator.evaluate(singleClusterDataset)
+      }
+      assert(e.getMessage.contains("Number of clusters must be greater than 
one"))
     }
-    assert(e.getMessage.contains("Number of clusters must be greater than 
one"))
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to