Repository: mahout Updated Branches: refs/heads/master 56a2305ed -> 66f164057
NOJIRA: more parameter naming conventions (style) Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/66f16405 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/66f16405 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/66f16405 Branch: refs/heads/master Commit: 66f164057e322d2e63ea02c35c9e30c3969e80b1 Parents: 56a2305 Author: Dmitriy Lyubimov <[email protected]> Authored: Thu Jul 31 18:06:27 2014 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Thu Jul 31 18:06:27 2014 -0700 ---------------------------------------------------------------------- .../apache/mahout/math/decompositions/ALS.scala | 7 ++--- .../apache/mahout/math/decompositions/DQR.scala | 10 +++--- .../mahout/math/decompositions/DSPCA.scala | 22 +++++++------- .../mahout/math/decompositions/DSSVD.scala | 18 +++++------ .../mahout/math/decompositions/package.scala | 32 ++++++++++++-------- .../DistributedDecompositionsSuiteBase.scala | 4 +-- 6 files changed, 50 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/66f16405/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala index 5aed649..4e2f45c 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala @@ -65,7 +65,7 @@ private[math] object ALS { * whichever earlier. * <P> * - * @param drmInput The input matrix + * @param drmA The input matrix * @param k required rank of decomposition (number of cols in U and V results) * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this * value. If <=0 then we won't compute RMSE and use convergence test. @@ -75,7 +75,7 @@ private[math] object ALS { * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} */ def dals[K: ClassTag]( - drmInput: DrmLike[K], + drmA: DrmLike[K], k: Int = 50, lambda: Double = 0.0, maxIterations: Int = 10, @@ -85,8 +85,7 @@ private[math] object ALS { assert(convergenceThreshold < 1.0, "convergenceThreshold") assert(maxIterations >= 1, "maxIterations") - val drmA = drmInput - val drmAt = drmInput.t + val drmAt = drmA.t // Initialize U and V so that they are identically distributed to A or A' var drmU = drmA.mapBlock(ncol = k) { http://git-wip-us.apache.org/repos/asf/mahout/blob/66f16405/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala index 4ca99b1..7caa3dd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala @@ -38,14 +38,14 @@ object DQR { * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so * their RDD should be able to zip successfully. */ - def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { + def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { - if (A.ncol > 5000) + if (drmA.ncol > 5000) log.warn("A is too fat. A'A must fit in memory and easily broadcasted.") - implicit val ctx = A.context + implicit val ctx = drmA.context - val AtA = (A.t %*% A).checkpoint() + val AtA = (drmA.t %*% drmA).checkpoint() val inCoreAtA = AtA.collect if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA)) @@ -64,7 +64,7 @@ object DQR { // decompose A'A in the backend again. // Compute Q = A*inv(L') -- we can do it blockwise. - val Q = A.mapBlock() { + val Q = drmA.mapBlock() { case (keys, block) => keys -> chol(bcastAtA).solveRight(block) } http://git-wip-us.apache.org/repos/asf/mahout/blob/66f16405/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala index 37c218a..de7402d 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala @@ -31,21 +31,21 @@ object DSPCA { * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" * document of the MAHOUT-817. * - * @param A input matrix A + * @param drmA input matrix A * @param k request SSVD rank * @param p oversampling parameter * @param q number of power iterations (hint: use either 0 or 1) * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): (DrmLike[K], DrmLike[Int], Vector) = { - val drmA = A.checkpoint() - implicit val ctx = A.context + val drmAcp = drmA.checkpoint() + implicit val ctx = drmAcp.context - val m = drmA.nrow - val n = drmA.ncol + val m = drmAcp.nrow + val n = drmAcp.ncol assert(k <= (m min n), "k cannot be greater than smaller of m, n.") val pfxed = safeToNonNegInt((m min n) - k min p) @@ -53,7 +53,7 @@ object DSPCA { val r = k + pfxed // Dataset mean - val xi = drmA.colMeans + val xi = drmAcp.colMeans // We represent Omega by its seed. val omegaSeed = RandomUtils.getRandom().nextInt() @@ -67,7 +67,7 @@ object DSPCA { val bcastS_o = drmBroadcast(s_o) val bcastXi = drmBroadcast(xi) - var drmY = drmA.mapBlock(ncol = r) { + var drmY = drmAcp.mapBlock(ncol = r) { case (keys, blockA) => val s_o:Vector = bcastS_o val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) @@ -84,7 +84,7 @@ object DSPCA { // This actually should be optimized as identically partitioned map-side A'B since A and Q should // still be identically partitioned. - var drmBt = (drmA.t %*% drmQ).checkpoint() + var drmBt = (drmAcp.t %*% drmQ).checkpoint() var s_b = (drmBt.t %*% xi).collect(::, 0) var bcastVarS_b = drmBroadcast(s_b) @@ -112,7 +112,7 @@ object DSPCA { drmY.uncache() drmQ.uncache() - drmY = (drmA %*% drmBt) + drmY = (drmAcp %*% drmBt) // Fix Y by subtracting s_b from each row of the AB' .mapBlock() { case (keys, block) => @@ -130,7 +130,7 @@ object DSPCA { // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not // identically partitioned anymore. - drmBt = (drmA.t %*% drmQ).checkpoint() + drmBt = (drmAcp.t %*% drmQ).checkpoint() s_b = (drmBt.t %*% xi).collect(::, 0) bcastVarS_b = drmBroadcast(s_b) http://git-wip-us.apache.org/repos/asf/mahout/blob/66f16405/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala index a158390..1abfb87 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala @@ -13,20 +13,20 @@ object DSSVD { /** * Distributed Stochastic Singular Value decomposition algorithm. * - * @param A input matrix A + * @param drmA input matrix A * @param k request SSVD rank * @param p oversampling parameter * @param q number of power iterations * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): (DrmLike[K], DrmLike[Int], Vector) = { - val drmA = A.checkpoint() + val drmAcp = drmA.checkpoint() - val m = drmA.nrow - val n = drmA.ncol + val m = drmAcp.nrow + val n = drmAcp.ncol assert(k <= (m min n), "k cannot be greater than smaller of m, n.") val pfxed = safeToNonNegInt((m min n) - k min p) @@ -39,7 +39,7 @@ object DSSVD { // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and // instantiate the Omega random matrix view in the backend instead. That way serialized closure // is much more compact. - var drmY = drmA.mapBlock(ncol = r) { + var drmY = drmAcp.mapBlock(ncol = r) { case (keys, blockA) => val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) keys -> blockY @@ -51,19 +51,19 @@ object DSSVD { // This actually should be optimized as identically partitioned map-side A'B since A and Q should // still be identically partitioned. - var drmBt = drmA.t %*% drmQ + var drmBt = drmAcp.t %*% drmQ // Checkpoint B' if last iteration if (q == 0) drmBt = drmBt.checkpoint() for (i <- 0 until q) { - drmY = drmA %*% drmBt + drmY = drmAcp %*% drmBt drmQ = dqrThin(drmY.checkpoint())._1 // Checkpoint Q if last iteration if (i == q - 1) drmQ = drmQ.checkpoint() // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not // identically partitioned anymore. - drmBt = drmA.t %*% drmQ + drmBt = drmAcp.t %*% drmQ // Checkpoint B' if last iteration if (i == q - 1) drmBt = drmBt.checkpoint() } http://git-wip-us.apache.org/repos/asf/mahout/blob/66f16405/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala index a3a8787..a7b829f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala @@ -28,6 +28,15 @@ package object decompositions { // ================ In-core decompositions =================== + /** + * In-core SSVD algorithm. + * + * @param a input matrix A + * @param k request SSVD rank + * @param p oversampling parameter + * @param q number of power iterations + * @return (U,V,s) + */ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q) /** @@ -50,7 +59,6 @@ package object decompositions { def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.spca(a = a, k = k, p = p, q = q) - // ============== Distributed decompositions =================== /** @@ -62,35 +70,35 @@ package object decompositions { * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so * their RDD should be able to zip successfully. */ - def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = - DQR.dqrThin(A, checkRankDeficiency) + def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = + DQR.dqrThin(drmA, checkRankDeficiency) /** * Distributed Stochastic Singular Value decomposition algorithm. * - * @param A input matrix A + * @param drmA input matrix A * @param k request SSVD rank * @param p oversampling parameter * @param q number of power iterations * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(A, k, p, q) + def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(drmA, k, p, q) /** * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" * document of the MAHOUT-817. * - * @param A input matrix A + * @param drmA input matrix A * @param k request SSVD rank * @param p oversampling parameter * @param q number of power iterations (hint: use either 0 or 1) * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(A, k, p, q) + def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(drmA, k, p, q) /** Result for distributed ALS-type two-component factorization algorithms */ type FactorizationResult[K] = ALS.Result[K] @@ -112,7 +120,7 @@ package object decompositions { * whichever earlier. * <P> * - * @param drmInput The input matrix + * @param drmA The input matrix * @param k required rank of decomposition (number of cols in U and V results) * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this * value. If <=0 then we won't compute RMSE and use convergence test. @@ -122,12 +130,12 @@ package object decompositions { * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} */ def dals[K: ClassTag]( - drmInput: DrmLike[K], + drmA: DrmLike[K], k: Int = 50, lambda: Double = 0.0, maxIterations: Int = 10, convergenceThreshold: Double = 0.10 ): FactorizationResult[K] = - ALS.dals(drmInput, k, lambda, maxIterations, convergenceThreshold) + ALS.dals(drmA, k, lambda, maxIterations, convergenceThreshold) } http://git-wip-us.apache.org/repos/asf/mahout/blob/66f16405/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala index d37ab17..740f6fc 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala @@ -158,7 +158,7 @@ trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Mat val k = 10 // Calculate just first 10 principal factors and reduce dimensionality. - var (drmPCA, _, s) = dspca(A = drmInput, k = 10, p = spectrumLen, q = 1) + var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) // Un-normalized pca data: drmPCA = drmPCA %*% diagv(s) @@ -199,7 +199,7 @@ trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Mat val drmA = drmParallelize(inCoreA, numPartitions = 2) // Decompose using ALS - val (drmU, drmV, rmse) = dals(drmInput = drmA, k = 20).toTuple + val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple val inCoreU = drmU.collect val inCoreV = drmV.collect
