Github user mengxr commented on a diff in the pull request: https://github.com/apache/incubator-spark/pull/564#discussion_r9983939 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.apache.spark.mllib.util._ + + +/** + * Class used to obtain principal components + */ +class PCA { + private var k: Int = 1 + + /** + * Set the number of top-k principle components to return + */ + def setK(k: Int): PCA = { + this.k = k + this + } + + /** + * Compute PCA using the current set parameters + */ + def compute(matrix: DenseMatrix): DenseMatrix = { + computePCA(matrix, k) + } + + /** + * Principal Component Analysis. + * Computes the top k principal component coefficients for the m-by-n data matrix X. + * Rows of X correspond to observations and columns correspond to variables. + * The coefficient matrix is n-by-k. Each column of coeff contains coefficients + * for one principal component, and the columns are in descending + * order of component variance. + * This function centers the data and uses the + * singular value decomposition (SVD) algorithm. + * + * All input and output is expected in DenseMatrix format + * + * @param matrix dense matrix to perform pca on + * @param k Recover k principal components + * @return An nxk matrix of principal components + */ + def computePCA(matrix: DenseMatrix, k: Int): DenseMatrix = { + val m = matrix.m + val n = matrix.n + + if (m <= 0 || n <= 0) { + throw new IllegalArgumentException("Expecting a well-formed matrix") + } + + // compute column sums and normalize matrix + val rawData = matrix.rows.flatMap{ + x => Array.tabulate(x.data.size)(idx => MatrixEntry(x.i, idx, x.data(idx))) + } + val colSums = rawData.map(entry => (entry.j, entry.mval)).reduceByKey(_ + _) + val data = rawData.map(entry => (entry.j, (entry.i, entry.mval))).join(colSums).map{ + case (col, ((row, mval), colsum)) => + MatrixEntry(row, col, (mval - colsum / m.toDouble) / Math.sqrt(n-1)) } --- End diff -- It creates many small objects and breaks the data continuity of each row. The following code should be adequate for computing column sums: ~~~ def addi(a: Array[Double], b: Array[Double]): Array[Double] = ... val colsums = matrix.rows.fold(new Array[Double](matrix.n))(addi) ~~~ I ran a small test of size 1000000x100, where it is about 50x faster than flat-mapping rows to entries and then reducing by column indices.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. To do so, please top-post your response. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---