Repository: spark
Updated Branches:
  refs/heads/master 1e1f33997 -> c1befd780


[SPARK-8664] [ML] Add PCA transformer

Add PCA transformer for ML pipeline

Author: Yanbo Liang <[email protected]>

Closes #7065 from yanboliang/spark-8664 and squashes the following commits:

4afae45 [Yanbo Liang] address comments
e9effd7 [Yanbo Liang] Add PCA transformer


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1befd78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1befd78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1befd78

Branch: refs/heads/master
Commit: c1befd780c3defc843baa75097de7ec427d3f8ca
Parents: 1e1f339
Author: Yanbo Liang <[email protected]>
Authored: Tue Jun 30 12:23:48 2015 -0700
Committer: Xiangrui Meng <[email protected]>
Committed: Tue Jun 30 12:23:48 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ml/feature/PCA.scala | 130 +++++++++++++++++++
 .../org/apache/spark/mllib/feature/PCA.scala    |   2 +-
 .../org/apache/spark/ml/feature/PCASuite.scala  |  64 +++++++++
 3 files changed, 195 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1befd78/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
new file mode 100644
index 0000000..2d3bb68
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.ml._
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared._
+import org.apache.spark.ml.util.Identifiable
+import org.apache.spark.mllib.feature
+import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Params for [[PCA]] and [[PCAModel]].
+ */
+private[feature] trait PCAParams extends Params with HasInputCol with 
HasOutputCol {
+
+  /**
+   * The number of principal components.
+   * @group param
+   */
+  final val k: IntParam = new IntParam(this, "k", "the number of principal 
components")
+
+  /** @group getParam */
+  def getK: Int = $(k)
+
+}
+
+/**
+ * :: Experimental ::
+ * PCA trains a model to project vectors to a low-dimensional space using PCA.
+ */
+@Experimental
+class PCA (override val uid: String) extends Estimator[PCAModel] with 
PCAParams {
+
+  def this() = this(Identifiable.randomUID("pca"))
+
+  /** @group setParam */
+  def setInputCol(value: String): this.type = set(inputCol, value)
+
+  /** @group setParam */
+  def setOutputCol(value: String): this.type = set(outputCol, value)
+
+  /** @group setParam */
+  def setK(value: Int): this.type = set(k, value)
+
+  /**
+   * Computes a [[PCAModel]] that contains the principal components of the 
input vectors.
+   */
+  override def fit(dataset: DataFrame): PCAModel = {
+    transformSchema(dataset.schema, logging = true)
+    val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v}
+    val pca = new feature.PCA(k = $(k))
+    val pcaModel = pca.fit(input)
+    copyValues(new PCAModel(uid, pcaModel).setParent(this))
+  }
+
+  override def transformSchema(schema: StructType): StructType = {
+    val inputType = schema($(inputCol)).dataType
+    require(inputType.isInstanceOf[VectorUDT],
+      s"Input column ${$(inputCol)} must be a vector column")
+    require(!schema.fieldNames.contains($(outputCol)),
+      s"Output column ${$(outputCol)} already exists.")
+    val outputFields = schema.fields :+ StructField($(outputCol), new 
VectorUDT, false)
+    StructType(outputFields)
+  }
+
+  override def copy(extra: ParamMap): PCA = defaultCopy(extra)
+}
+
+/**
+ * :: Experimental ::
+ * Model fitted by [[PCA]].
+ */
+@Experimental
+class PCAModel private[ml] (
+    override val uid: String,
+    pcaModel: feature.PCAModel)
+  extends Model[PCAModel] with PCAParams {
+
+  /** @group setParam */
+  def setInputCol(value: String): this.type = set(inputCol, value)
+
+  /** @group setParam */
+  def setOutputCol(value: String): this.type = set(outputCol, value)
+
+  /**
+   * Transform a vector by computed Principal Components.
+   * NOTE: Vectors to be transformed must be the same length
+   * as the source vectors given to [[PCA.fit()]].
+   */
+  override def transform(dataset: DataFrame): DataFrame = {
+    transformSchema(dataset.schema, logging = true)
+    val pcaOp = udf { pcaModel.transform _ }
+    dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
+  }
+
+  override def transformSchema(schema: StructType): StructType = {
+    val inputType = schema($(inputCol)).dataType
+    require(inputType.isInstanceOf[VectorUDT],
+      s"Input column ${$(inputCol)} must be a vector column")
+    require(!schema.fieldNames.contains($(outputCol)),
+      s"Output column ${$(outputCol)} already exists.")
+    val outputFields = schema.fields :+ StructField($(outputCol), new 
VectorUDT, false)
+    StructType(outputFields)
+  }
+
+  override def copy(extra: ParamMap): PCAModel = {
+    val copied = new PCAModel(uid, pcaModel)
+    copyValues(copied, extra)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1befd78/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
index 4e01e40..2a66263 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala
@@ -68,7 +68,7 @@ class PCA(val k: Int) {
  * @param k number of principal components.
  * @param pc a principal components Matrix. Each column is one principal 
component.
  */
-class PCAModel private[mllib] (val k: Int, val pc: DenseMatrix) extends 
VectorTransformer {
+class PCAModel private[spark] (val k: Int, val pc: DenseMatrix) extends 
VectorTransformer {
   /**
    * Transform a vector by computed Principal Components.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/c1befd78/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
new file mode 100644
index 0000000..d0ae36b
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.param.ParamsSuite
+import org.apache.spark.mllib.linalg.distributed.RowMatrix
+import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseMatrix, Matrices}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.mllib.feature.{PCAModel => OldPCAModel}
+import org.apache.spark.sql.Row
+
+class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
+
+  test("params") {
+    ParamsSuite.checkParams(new PCA)
+    val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 
3.0)).asInstanceOf[DenseMatrix]
+    val model = new PCAModel("pca", new OldPCAModel(2, mat))
+    ParamsSuite.checkParams(model)
+  }
+
+  test("pca") {
+    val data = Array(
+      Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
+      Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
+      Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
+    )
+
+    val dataRDD = sc.parallelize(data, 2)
+
+    val mat = new RowMatrix(dataRDD)
+    val pc = mat.computePrincipalComponents(3)
+    val expected = mat.multiply(pc).rows
+
+    val df = 
sqlContext.createDataFrame(dataRDD.zip(expected)).toDF("features", "expected")
+
+    val pca = new PCA()
+      .setInputCol("features")
+      .setOutputCol("pca_features")
+      .setK(3)
+      .fit(df)
+
+    pca.transform(df).select("pca_features", "expected").collect().foreach {
+      case Row(x: Vector, y: Vector) =>
+        assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to