Repository: spark Updated Branches: refs/heads/master e29704f90 -> 21b3d2a75
[SPARK-11530][MLLIB] Return eigenvalues with PCA model Add `computePrincipalComponentsAndVariance` to also compute PCA's explained variance. CC mengxr Author: Sean Owen <[email protected]> Closes #9736 from srowen/SPARK-11530. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21b3d2a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21b3d2a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21b3d2a7 Branch: refs/heads/master Commit: 21b3d2a75f679b252e293000d706741dca33624a Parents: e29704f Author: Sean Owen <[email protected]> Authored: Thu Dec 10 14:05:45 2015 +0000 Committer: Sean Owen <[email protected]> Committed: Thu Dec 10 14:05:45 2015 +0000 ---------------------------------------------------------------------- .../scala/org/apache/spark/ml/feature/PCA.scala | 20 ++++++------ .../org/apache/spark/mllib/feature/PCA.scala | 16 +++++++--- .../mllib/linalg/distributed/RowMatrix.scala | 33 +++++++++++++++----- .../org/apache/spark/ml/feature/PCASuite.scala | 7 +++-- .../apache/spark/mllib/feature/PCASuite.scala | 3 +- .../linalg/distributed/RowMatrixSuite.scala | 10 +++++- project/MimaExcludes.scala | 3 ++ 7 files changed, 67 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index aa88cb0..53d33ea 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -73,7 +73,7 @@ class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams val input = dataset.select($(inputCol)).map { case Row(v: Vector) => v} val pca = new feature.PCA(k = $(k)) val pcaModel = pca.fit(input) - copyValues(new PCAModel(uid, pcaModel.pc).setParent(this)) + copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this)) } override def transformSchema(schema: StructType): StructType = { @@ -105,7 +105,8 @@ object PCA extends DefaultParamsReadable[PCA] { @Experimental class PCAModel private[ml] ( override val uid: String, - val pc: DenseMatrix) + val pc: DenseMatrix, + val explainedVariance: DenseVector) extends Model[PCAModel] with PCAParams with MLWritable { import PCAModel._ @@ -123,7 +124,7 @@ class PCAModel private[ml] ( */ override def transform(dataset: DataFrame): DataFrame = { transformSchema(dataset.schema, logging = true) - val pcaModel = new feature.PCAModel($(k), pc) + val pcaModel = new feature.PCAModel($(k), pc, explainedVariance) val pcaOp = udf { pcaModel.transform _ } dataset.withColumn($(outputCol), pcaOp(col($(inputCol)))) } @@ -139,7 +140,7 @@ class PCAModel private[ml] ( } override def copy(extra: ParamMap): PCAModel = { - val copied = new PCAModel(uid, pc) + val copied = new PCAModel(uid, pc, explainedVariance) copyValues(copied, extra).setParent(parent) } @@ -152,11 +153,11 @@ object PCAModel extends MLReadable[PCAModel] { private[PCAModel] class PCAModelWriter(instance: PCAModel) extends MLWriter { - private case class Data(pc: DenseMatrix) + private case class Data(pc: DenseMatrix, explainedVariance: DenseVector) override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.pc) + val data = Data(instance.pc, instance.explainedVariance) val dataPath = new Path(path, "data").toString sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) } @@ -169,10 +170,11 @@ object PCAModel extends MLReadable[PCAModel] { override def load(path: String): PCAModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val Row(pc: DenseMatrix) = sqlContext.read.parquet(dataPath) - .select("pc") + val Row(pc: DenseMatrix, explainedVariance: DenseVector) = + sqlContext.read.parquet(dataPath) + .select("pc", "explainedVariance") .head() - val model = new PCAModel(metadata.uid, pc) + val model = new PCAModel(metadata.uid, pc, explainedVariance) DefaultParamsReader.getAndSetParams(model, metadata) model } http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala index ecb3c1e..24e0a98 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/PCA.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.feature -import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.linalg.distributed.RowMatrix @@ -43,7 +43,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { s"source vector size is ${sources.first().size} must be greater than k=$k") val mat = new RowMatrix(sources) - val pc = mat.computePrincipalComponents(k) match { + val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k) + val densePC = pc match { case dm: DenseMatrix => dm case sm: SparseMatrix => @@ -58,7 +59,13 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { s"SparseMatrix or DenseMatrix. Instead got: ${m.getClass}") } - new PCAModel(k, pc) + val denseExplainedVariance = explainedVariance match { + case dv: DenseVector => + dv + case sv: SparseVector => + sv.toDense + } + new PCAModel(k, densePC, denseExplainedVariance) } /** @@ -77,7 +84,8 @@ class PCA @Since("1.4.0") (@Since("1.4.0") val k: Int) { @Since("1.4.0") class PCAModel private[spark] ( @Since("1.4.0") val k: Int, - @Since("1.4.0") val pc: DenseMatrix) extends VectorTransformer { + @Since("1.4.0") val pc: DenseMatrix, + @Since("1.6.0") val explainedVariance: DenseVector) extends VectorTransformer { /** * Transform a vector by computed Principal Components. * http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 52c0f19..2018a67 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -368,7 +368,8 @@ class RowMatrix @Since("1.0.0") ( } /** - * Computes the top k principal components. + * Computes the top k principal components and a vector of proportions of + * variance explained by each principal component. * Rows correspond to observations and columns correspond to variables. * The principal components are stored a local matrix of size n-by-k. * Each column corresponds for one principal component, @@ -379,25 +380,43 @@ class RowMatrix @Since("1.0.0") ( * Note that this cannot be computed on matrices with more than 65535 columns. * * @param k number of top principal components. - * @return a matrix of size n-by-k, whose columns are principal components + * @return a matrix of size n-by-k, whose columns are principal components, and + * a vector of values which indicate how much variance each principal component + * explains */ - @Since("1.0.0") - def computePrincipalComponents(k: Int): Matrix = { + @Since("1.6.0") + def computePrincipalComponentsAndExplainedVariance(k: Int): (Matrix, Vector) = { val n = numCols().toInt require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]") val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]] - val brzSvd.SVD(u: BDM[Double], _, _) = brzSvd(Cov) + val brzSvd.SVD(u: BDM[Double], s: BDV[Double], _) = brzSvd(Cov) + + val eigenSum = s.data.sum + val explainedVariance = s.data.map(_ / eigenSum) if (k == n) { - Matrices.dense(n, k, u.data) + (Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance)) } else { - Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)) + (Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)), + Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k))) } } /** + * Computes the top k principal components only. + * + * @param k number of top principal components. + * @return a matrix of size n-by-k, whose columns are principal components + * @see computePrincipalComponentsAndExplainedVariance + */ + @Since("1.0.0") + def computePrincipalComponents(k: Int): Matrix = { + computePrincipalComponentsAndExplainedVariance(k)._1 + } + + /** * Computes column-wise summary statistics. */ @Since("1.0.0") http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala index edab21e..9f6618b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ -import org.apache.spark.mllib.feature.{PCAModel => OldPCAModel} import org.apache.spark.sql.Row class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -32,7 +31,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead test("params") { ParamsSuite.checkParams(new PCA) val mat = Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix] - val model = new PCAModel("pca", mat) + val explainedVariance = Vectors.dense(0.5, 0.5).asInstanceOf[DenseVector] + val model = new PCAModel("pca", mat, explainedVariance) ParamsSuite.checkParams(model) } @@ -76,7 +76,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead test("PCAModel read/write") { val instance = new PCAModel("myPCAModel", - Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix]) + Matrices.dense(2, 2, Array(0.0, 1.0, 2.0, 3.0)).asInstanceOf[DenseMatrix], + Vectors.dense(0.5, 0.5).asInstanceOf[DenseVector]) val newInstance = testDefaultReadWrite(instance) assert(newInstance.pc === instance.pc) } http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala index e57f491..a8d8293 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -37,11 +37,12 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext { val pca = new PCA(k).fit(dataRDD) val mat = new RowMatrix(dataRDD) - val pc = mat.computePrincipalComponents(k) + val (pc, explainedVariance) = mat.computePrincipalComponentsAndExplainedVariance(k) val pca_transform = pca.transform(dataRDD).collect() val mat_multiply = mat.multiply(pc).rows.collect() assert(pca_transform.toSet === mat_multiply.toSet) + assert(pca.explainedVariance === explainedVariance) } } http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 4abb98f..0ff901d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.linalg.distributed +import java.util.Arrays + import scala.util.Random import breeze.numerics.abs @@ -49,6 +51,7 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { (0.0, 1.0, 0.0), (math.sqrt(2.0) / 2.0, 0.0, math.sqrt(2.0) / 2.0), (math.sqrt(2.0) / 2.0, 0.0, - math.sqrt(2.0) / 2.0)) + val explainedVariance = BDV(4.0 / 7.0, 3.0 / 7.0, 0.0) var denseMat: RowMatrix = _ var sparseMat: RowMatrix = _ @@ -201,10 +204,15 @@ class RowMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { test("pca") { for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) { - val pc = denseMat.computePrincipalComponents(k) + val (pc, expVariance) = mat.computePrincipalComponentsAndExplainedVariance(k) assert(pc.numRows === n) assert(pc.numCols === k) assertColumnEqualUpToSign(pc.toBreeze.asInstanceOf[BDM[Double]], principalComponents, k) + assert( + closeToZero(BDV(expVariance.toArray) - + BDV(Arrays.copyOfRange(explainedVariance.data, 0, k)))) + // Check that this method returns the same answer + assert(pc === mat.computePrincipalComponents(k)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/21b3d2a7/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 685cb41..edae59d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -58,6 +58,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.ml.classification.LogisticRegressionSummary.featuresCol") ) ++ Seq( + // SPARK-11530 + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.mllib.feature.PCAModel.this") + ) ++ Seq( // SPARK-10381 Fix types / units in private AskPermissionToCommitOutput RPC message. // This class is marked as `private` but MiMa still seems to be confused by the change. ProblemFilters.exclude[MissingMethodProblem]( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
