Repository: spark
Updated Branches:
  refs/heads/master 6e19f7683 -> a471880af


[SPARK-24026][ML] Add Power Iteration Clustering to spark.ml

## What changes were proposed in this pull request?

This PR adds PowerIterationClustering as a Transformer to spark.ml.  In the 
transform method, it calls spark.mllib's PowerIterationClustering.run() method 
and transforms the return value assignments (the Kmeans output of the 
pseudo-eigenvector) as a DataFrame (id: LongType, cluster: IntegerType).

This PR is copied and modified from https://github.com/apache/spark/pull/15770  
The primary author is wangmiao1981

## How was this patch tested?

This PR has 2 types of tests:
* Copies of tests from spark.mllib's PIC tests
* New tests specific to the spark.ml APIs

Author: [email protected] <[email protected]>
Author: wangmiao1981 <[email protected]>
Author: Joseph K. Bradley <[email protected]>

Closes #21090 from jkbradley/wangmiao1981-pic.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a471880a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a471880a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a471880a

Branch: refs/heads/master
Commit: a471880afbeafd4ef54c15a97e72ea7ff784a88d
Parents: 6e19f76
Author: [email protected] <[email protected]>
Authored: Thu Apr 19 09:40:20 2018 -0700
Committer: Joseph K. Bradley <[email protected]>
Committed: Thu Apr 19 09:40:20 2018 -0700

----------------------------------------------------------------------
 .../clustering/PowerIterationClustering.scala   | 256 +++++++++++++++++++
 .../PowerIterationClusteringSuite.scala         | 238 +++++++++++++++++
 2 files changed, 494 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a471880a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
new file mode 100644
index 0000000..2c30a1d
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util._
+import org.apache.spark.mllib.clustering.{PowerIterationClustering => 
MLlibPowerIterationClustering}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types._
+
+/**
+ * Common params for PowerIterationClustering
+ */
+private[clustering] trait PowerIterationClusteringParams extends Params with 
HasMaxIter
+  with HasPredictionCol {
+
+  /**
+   * The number of clusters to create (k). Must be &gt; 1. Default: 2.
+   * @group param
+   */
+  @Since("2.4.0")
+  final val k = new IntParam(this, "k", "The number of clusters to create. " +
+    "Must be > 1.", ParamValidators.gt(1))
+
+  /** @group getParam */
+  @Since("2.4.0")
+  def getK: Int = $(k)
+
+  /**
+   * Param for the initialization algorithm. This can be either "random" to 
use a random vector
+   * as vertex properties, or "degree" to use a normalized sum of similarities 
with other vertices.
+   * Default: random.
+   * @group expertParam
+   */
+  @Since("2.4.0")
+  final val initMode = {
+    val allowedParams = ParamValidators.inArray(Array("random", "degree"))
+    new Param[String](this, "initMode", "The initialization algorithm. This 
can be either " +
+      "'random' to use a random vector as vertex properties, or 'degree' to 
use a normalized sum " +
+      "of similarities with other vertices.  Supported options: 'random' and 
'degree'.",
+      allowedParams)
+  }
+
+  /** @group expertGetParam */
+  @Since("2.4.0")
+  def getInitMode: String = $(initMode)
+
+  /**
+   * Param for the name of the input column for vertex IDs.
+   * Default: "id"
+   * @group param
+   */
+  @Since("2.4.0")
+  val idCol = new Param[String](this, "idCol", "Name of the input column for 
vertex IDs.",
+    (value: String) => value.nonEmpty)
+
+  setDefault(idCol, "id")
+
+  /** @group getParam */
+  @Since("2.4.0")
+  def getIdCol: String = getOrDefault(idCol)
+
+  /**
+   * Param for the name of the input column for neighbors in the adjacency 
list representation.
+   * Default: "neighbors"
+   * @group param
+   */
+  @Since("2.4.0")
+  val neighborsCol = new Param[String](this, "neighborsCol",
+    "Name of the input column for neighbors in the adjacency list 
representation.",
+    (value: String) => value.nonEmpty)
+
+  setDefault(neighborsCol, "neighbors")
+
+  /** @group getParam */
+  @Since("2.4.0")
+  def getNeighborsCol: String = $(neighborsCol)
+
+  /**
+   * Param for the name of the input column for neighbors in the adjacency 
list representation.
+   * Default: "similarities"
+   * @group param
+   */
+  @Since("2.4.0")
+  val similaritiesCol = new Param[String](this, "similaritiesCol",
+    "Name of the input column for neighbors in the adjacency list 
representation.",
+    (value: String) => value.nonEmpty)
+
+  setDefault(similaritiesCol, "similarities")
+
+  /** @group getParam */
+  @Since("2.4.0")
+  def getSimilaritiesCol: String = $(similaritiesCol)
+
+  protected def validateAndTransformSchema(schema: StructType): StructType = {
+    SchemaUtils.checkColumnTypes(schema, $(idCol), Seq(IntegerType, LongType))
+    SchemaUtils.checkColumnTypes(schema, $(neighborsCol),
+      Seq(ArrayType(IntegerType, containsNull = false),
+        ArrayType(LongType, containsNull = false)))
+    SchemaUtils.checkColumnTypes(schema, $(similaritiesCol),
+      Seq(ArrayType(FloatType, containsNull = false),
+        ArrayType(DoubleType, containsNull = false)))
+    SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Power Iteration Clustering (PIC), a scalable graph clustering algorithm 
developed by
+ * <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the 
abstract:
+ * PIC finds a very low-dimensional embedding of a dataset using truncated 
power
+ * iteration on a normalized pair-wise similarity matrix of the data.
+ *
+ * PIC takes an affinity matrix between items (or vertices) as input.  An 
affinity matrix
+ * is a symmetric matrix whose entries are non-negative similarities between 
items.
+ * PIC takes this matrix (or graph) as an adjacency matrix.  Specifically, 
each input row includes:
+ *  - `idCol`: vertex ID
+ *  - `neighborsCol`: neighbors of vertex in `idCol`
+ *  - `similaritiesCol`: non-negative weights (similarities) of edges between 
the vertex
+ *                       in `idCol` and each neighbor in `neighborsCol`
+ * PIC returns a cluster assignment for each input vertex.  It appends a new 
column `predictionCol`
+ * containing the cluster assignment in `[0,k)` for each row (vertex).
+ *
+ * Notes:
+ *  - [[PowerIterationClustering]] is a transformer with an expensive 
[[transform]] operation.
+ *    Transform runs the iterative PIC algorithm to cluster the whole input 
dataset.
+ *  - Input validation: This validates that similarities are non-negative but 
does NOT validate
+ *    that the input matrix is symmetric.
+ *
+ * @see <a href=http://en.wikipedia.org/wiki/Spectral_clustering>
+ * Spectral clustering (Wikipedia)</a>
+ */
+@Since("2.4.0")
+@Experimental
+class PowerIterationClustering private[clustering] (
+    @Since("2.4.0") override val uid: String)
+  extends Transformer with PowerIterationClusteringParams with 
DefaultParamsWritable {
+
+  setDefault(
+    k -> 2,
+    maxIter -> 20,
+    initMode -> "random")
+
+  @Since("2.4.0")
+  def this() = this(Identifiable.randomUID("PowerIterationClustering"))
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setPredictionCol(value: String): this.type = set(predictionCol, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setK(value: Int): this.type = set(k, value)
+
+  /** @group expertSetParam */
+  @Since("2.4.0")
+  def setInitMode(value: String): this.type = set(initMode, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setMaxIter(value: Int): this.type = set(maxIter, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setIdCol(value: String): this.type = set(idCol, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setNeighborsCol(value: String): this.type = set(neighborsCol, value)
+
+  /** @group setParam */
+  @Since("2.4.0")
+  def setSimilaritiesCol(value: String): this.type = set(similaritiesCol, 
value)
+
+  @Since("2.4.0")
+  override def transform(dataset: Dataset[_]): DataFrame = {
+    transformSchema(dataset.schema, logging = true)
+
+    val sparkSession = dataset.sparkSession
+    val idColValue = $(idCol)
+    val rdd: RDD[(Long, Long, Double)] =
+      dataset.select(
+        col($(idCol)).cast(LongType),
+        col($(neighborsCol)).cast(ArrayType(LongType, containsNull = false)),
+        col($(similaritiesCol)).cast(ArrayType(DoubleType, containsNull = 
false))
+      ).rdd.flatMap {
+        case Row(id: Long, nbrs: Seq[_], sims: Seq[_]) =>
+          require(nbrs.size == sims.size, s"The length of the neighbor ID list 
must be " +
+            s"equal to the the length of the neighbor similarity list.  Row 
for ID " +
+            s"$idColValue=$id has neighbor ID list of length ${nbrs.length} 
but similarity list " +
+            s"of length ${sims.length}.")
+          nbrs.asInstanceOf[Seq[Long]].zip(sims.asInstanceOf[Seq[Double]]).map 
{
+            case (nbr, similarity) => (id, nbr, similarity)
+          }
+      }
+    val algorithm = new MLlibPowerIterationClustering()
+      .setK($(k))
+      .setInitializationMode($(initMode))
+      .setMaxIterations($(maxIter))
+    val model = algorithm.run(rdd)
+
+    val predictionsRDD: RDD[Row] = model.assignments.map { assignment =>
+      Row(assignment.id, assignment.cluster)
+    }
+
+    val predictionsSchema = StructType(Seq(
+      StructField($(idCol), LongType, nullable = false),
+      StructField($(predictionCol), IntegerType, nullable = false)))
+    val predictions = {
+      val uncastPredictions = sparkSession.createDataFrame(predictionsRDD, 
predictionsSchema)
+      dataset.schema($(idCol)).dataType match {
+        case _: LongType =>
+          uncastPredictions
+        case otherType =>
+          
uncastPredictions.select(col($(idCol)).cast(otherType).alias($(idCol)))
+      }
+    }
+
+    dataset.join(predictions, $(idCol))
+  }
+
+  @Since("2.4.0")
+  override def transformSchema(schema: StructType): StructType = {
+    validateAndTransformSchema(schema)
+  }
+
+  @Since("2.4.0")
+  override def copy(extra: ParamMap): PowerIterationClustering = 
defaultCopy(extra)
+}
+
+@Since("2.4.0")
+object PowerIterationClustering extends 
DefaultParamsReadable[PowerIterationClustering] {
+
+  @Since("2.4.0")
+  override def load(path: String): PowerIterationClustering = super.load(path)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a471880a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
new file mode 100644
index 0000000..65328df
--- /dev/null
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.clustering
+
+import scala.collection.mutable
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types._
+
+
+class PowerIterationClusteringSuite extends SparkFunSuite
+  with MLlibTestSparkContext with DefaultReadWriteTest {
+
+  @transient var data: Dataset[_] = _
+  final val r1 = 1.0
+  final val n1 = 10
+  final val r2 = 4.0
+  final val n2 = 40
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2)
+  }
+
+  test("default parameters") {
+    val pic = new PowerIterationClustering()
+
+    assert(pic.getK === 2)
+    assert(pic.getMaxIter === 20)
+    assert(pic.getInitMode === "random")
+    assert(pic.getPredictionCol === "prediction")
+    assert(pic.getIdCol === "id")
+    assert(pic.getNeighborsCol === "neighbors")
+    assert(pic.getSimilaritiesCol === "similarities")
+  }
+
+  test("parameter validation") {
+    intercept[IllegalArgumentException] {
+      new PowerIterationClustering().setK(1)
+    }
+    intercept[IllegalArgumentException] {
+      new PowerIterationClustering().setInitMode("no_such_a_mode")
+    }
+    intercept[IllegalArgumentException] {
+      new PowerIterationClustering().setIdCol("")
+    }
+    intercept[IllegalArgumentException] {
+      new PowerIterationClustering().setNeighborsCol("")
+    }
+    intercept[IllegalArgumentException] {
+      new PowerIterationClustering().setSimilaritiesCol("")
+    }
+  }
+
+  test("power iteration clustering") {
+    val n = n1 + n2
+
+    val model = new PowerIterationClustering()
+      .setK(2)
+      .setMaxIter(40)
+    val result = model.transform(data)
+
+    val predictions = Array.fill(2)(mutable.Set.empty[Long])
+    result.select("id", "prediction").collect().foreach {
+      case Row(id: Long, cluster: Integer) => predictions(cluster) += id
+    }
+    assert(predictions.toSet == Set((1 until n1).toSet, (n1 until n).toSet))
+
+    val result2 = new PowerIterationClustering()
+      .setK(2)
+      .setMaxIter(10)
+      .setInitMode("degree")
+      .transform(data)
+    val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
+    result2.select("id", "prediction").collect().foreach {
+      case Row(id: Long, cluster: Integer) => predictions2(cluster) += id
+    }
+    assert(predictions2.toSet == Set((1 until n1).toSet, (n1 until n).toSet))
+  }
+
+  test("supported input types") {
+    val model = new PowerIterationClustering()
+      .setK(2)
+      .setMaxIter(1)
+
+    def runTest(idType: DataType, neighborType: DataType, similarityType: 
DataType): Unit = {
+      val typedData = data.select(
+        col("id").cast(idType).alias("id"),
+        col("neighbors").cast(ArrayType(neighborType, containsNull = 
false)).alias("neighbors"),
+        col("similarities").cast(ArrayType(similarityType, containsNull = 
false))
+          .alias("similarities")
+      )
+      model.transform(typedData).collect()
+    }
+
+    for (idType <- Seq(IntegerType, LongType)) {
+      runTest(idType, LongType, DoubleType)
+    }
+    for (neighborType <- Seq(IntegerType, LongType)) {
+      runTest(LongType, neighborType, DoubleType)
+    }
+    for (similarityType <- Seq(FloatType, DoubleType)) {
+      runTest(LongType, LongType, similarityType)
+    }
+  }
+
+  test("invalid input: wrong types") {
+    val model = new PowerIterationClustering()
+      .setK(2)
+      .setMaxIter(1)
+    intercept[IllegalArgumentException] {
+      val typedData = data.select(
+        col("id").cast(DoubleType).alias("id"),
+        col("neighbors"),
+        col("similarities")
+      )
+      model.transform(typedData)
+    }
+    intercept[IllegalArgumentException] {
+      val typedData = data.select(
+        col("id"),
+        col("neighbors").cast(ArrayType(DoubleType, containsNull = 
false)).alias("neighbors"),
+        col("similarities")
+      )
+      model.transform(typedData)
+    }
+    intercept[IllegalArgumentException] {
+      val typedData = data.select(
+        col("id"),
+        col("neighbors"),
+        col("neighbors").alias("similarities")
+      )
+      model.transform(typedData)
+    }
+  }
+
+  test("invalid input: negative similarity") {
+    val model = new PowerIterationClustering()
+      .setMaxIter(1)
+    val badData = spark.createDataFrame(Seq(
+      (0, Array(1), Array(-1.0)),
+      (1, Array(0), Array(-1.0))
+    )).toDF("id", "neighbors", "similarities")
+    val msg = intercept[SparkException] {
+      model.transform(badData)
+    }.getCause.getMessage
+    assert(msg.contains("Similarity must be nonnegative"))
+  }
+
+  test("invalid input: mismatched lengths for neighbor and similarity arrays") 
{
+    val model = new PowerIterationClustering()
+      .setMaxIter(1)
+    val badData = spark.createDataFrame(Seq(
+      (0, Array(1), Array(0.5)),
+      (1, Array(0, 2), Array(0.5)),
+      (2, Array(1), Array(0.5))
+    )).toDF("id", "neighbors", "similarities")
+    val msg = intercept[SparkException] {
+      model.transform(badData)
+    }.getCause.getMessage
+    assert(msg.contains("The length of the neighbor ID list must be equal to 
the the length of " +
+      "the neighbor similarity list."))
+    assert(msg.contains(s"Row for ID ${model.getIdCol}=1"))
+  }
+
+  test("read/write") {
+    val t = new PowerIterationClustering()
+      .setK(4)
+      .setMaxIter(100)
+      .setInitMode("degree")
+      .setIdCol("test_id")
+      .setNeighborsCol("myNeighborsCol")
+      .setSimilaritiesCol("mySimilaritiesCol")
+      .setPredictionCol("test_prediction")
+    testDefaultReadWrite(t)
+  }
+}
+
+object PowerIterationClusteringSuite {
+
+  /** Generates a circle of points. */
+  private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
+    Array.tabulate(n) { i =>
+      val theta = 2.0 * math.Pi * i / n
+      (r * math.cos(theta), r * math.sin(theta))
+    }
+  }
+
+  /** Computes Gaussian similarity. */
+  private def sim(x: (Double, Double), y: (Double, Double)): Double = {
+    val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
+    math.exp(-dist2 / 2.0)
+  }
+
+  def generatePICData(
+      spark: SparkSession,
+      r1: Double,
+      r2: Double,
+      n1: Int,
+      n2: Int): DataFrame = {
+    // Generate two circles following the example in the PIC paper.
+    val n = n1 + n2
+    val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+
+    val rows = for (i <- 1 until n) yield {
+      val neighbors = for (j <- 0 until i) yield {
+        j.toLong
+      }
+      val similarities = for (j <- 0 until i) yield {
+        sim(points(i), points(j))
+      }
+      (i.toLong, neighbors.toArray, similarities.toArray)
+    }
+
+    spark.createDataFrame(rows).toDF("id", "neighbors", "similarities")
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to