MAHOUT-1757:Small fix in spca formula, this closes Mahout#152
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/7f321e08 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/7f321e08 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/7f321e08 Branch: refs/heads/mahout-0.10.x Commit: 7f321e080cb011de8cfe8dfb6dd7a231c6196432 Parents: 2eb30f4 Author: smarthi <[email protected]> Authored: Fri Jul 31 15:25:19 2015 -0400 Committer: smarthi <[email protected]> Committed: Fri Jul 31 15:25:19 2015 -0400 ---------------------------------------------------------------------- .../mahout/math/decompositions/DSPCA.scala | 49 +++++++++++--------- .../mahout/math/decompositions/SSVD.scala | 26 ++++++----- .../DistributedDecompositionsSuite.scala | 6 +-- 3 files changed, 42 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/7f321e08/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala index de7402d..c98ee2e 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala @@ -53,7 +53,9 @@ object DSPCA { val r = k + pfxed // Dataset mean - val xi = drmAcp.colMeans + val mu = drmAcp.colMeans + + val mtm = mu dot mu // We represent Omega by its seed. val omegaSeed = RandomUtils.getRandom().nextInt() @@ -62,17 +64,17 @@ object DSPCA { // This done in front in a single-threaded fashion for now. Even though it doesn't require any // memory beyond that is required to keep xi around, it still might be parallelized to backs // for significantly big n and r. TODO - val s_o = omega.t %*% xi + val s_o = omega.t %*% mu val bcastS_o = drmBroadcast(s_o) - val bcastXi = drmBroadcast(xi) + val bcastMu = drmBroadcast(mu) var drmY = drmAcp.mapBlock(ncol = r) { - case (keys, blockA) => + case (keys, blockA) â val s_o:Vector = bcastS_o val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) - for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o - keys -> blockY + for (row â 0 until blockY.nrow) blockY(row, ::) -= s_o + keys â blockY } // Checkpoint Y .checkpoint() @@ -86,39 +88,40 @@ object DSPCA { // still be identically partitioned. var drmBt = (drmAcp.t %*% drmQ).checkpoint() - var s_b = (drmBt.t %*% xi).collect(::, 0) + var s_b = (drmBt.t %*% mu).collect(::, 0) var bcastVarS_b = drmBroadcast(s_b) - for (i <- 0 until q) { + for (i â 0 until q) { // These closures don't seem to live well with outside-scope vars. This doesn't record closure // attributes correctly. So we create additional set of vals for broadcast vars to properly // create readonly closure attributes in this very scope. val bcastS_q = bcastVarS_q - val bcastS_b = bcastVarS_b - val bcastXib = bcastXi + val bcastMuInner = bcastMu // Fix Bt as B' -= xi cross s_q drmBt = drmBt.mapBlock() { - case (keys, block) => + case (keys, block) â val s_q: Vector = bcastS_q - val xi: Vector = bcastXib + val mu: Vector = bcastMuInner keys.zipWithIndex.foreach { - case (key, idx) => block(idx, ::) -= s_q * xi(key) + case (key, idx) â block(idx, ::) -= s_q * mu(key) } - keys -> block + keys â block } drmY.uncache() drmQ.uncache() + val bCastSt_b = drmBroadcast(s_b -=: mtm * s_q) + drmY = (drmAcp %*% drmBt) - // Fix Y by subtracting s_b from each row of the AB' + // Fix Y by subtracting st_b from each row of the AB' .mapBlock() { - case (keys, block) => - val s_b: Vector = bcastS_b - for (row <- 0 until block.nrow) block(row, ::) -= s_b - keys -> block + case (keys, block) â + val st_b: Vector = bCastSt_b + block := { (_, c, v) â v - st_b(c) } + keys â block } // Checkpoint Y .checkpoint() @@ -132,20 +135,20 @@ object DSPCA { // identically partitioned anymore. drmBt = (drmAcp.t %*% drmQ).checkpoint() - s_b = (drmBt.t %*% xi).collect(::, 0) + s_b = (drmBt.t %*% mu).collect(::, 0) bcastVarS_b = drmBroadcast(s_b) } val c = s_q cross s_b - val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect - - c - c.t + (s_q cross s_q) * (xi dot xi) + val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -=: + c -=: c.t +=: mtm *=: (s_q cross s_q) val (inCoreUHat, d) = eigen(inCoreBBt) val s = d.sqrt // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? val drmU = drmQ %*% inCoreUHat - val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) + val drmV = drmBt %*% (inCoreUHat %*% diagv(1 / s)) (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) } http://git-wip-us.apache.org/repos/asf/mahout/blob/7f321e08/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala index e1b2f03..fba9517 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala @@ -59,7 +59,7 @@ private[math] object SSVD { var bt = ch.solveRight(at %*% y) // Power iterations - for (i <- 0 until q) { + for (i â 0 until q) { y = a %*% bt yty = y.t %*% y ch = chol(yty) @@ -71,7 +71,7 @@ private[math] object SSVD { val s = d.sqrt val u = ch.solveRight(y) %*% uhat - val v = bt %*% (uhat %*%: diagv(1 /: s)) + val v = bt %*% (uhat %*% diagv(1 /: s)) (u(::, 0 until k), v(::, 0 until k), s(0 until k)) } @@ -108,15 +108,16 @@ private[math] object SSVD { val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) // Dataset mean - val xi = a.colMeans() + val mu = a.colMeans() + val mtm = mu dot mu - if (log.isDebugEnabled) log.debug("xi=%s".format(xi)) + if (log.isDebugEnabled) log.debug("xi=%s".format(mu)) var y = a %*% omega // Fixing y - val s_o = omega.t %*% xi - y := ((r,c,v) => v - s_o(c)) + val s_o = omega.t %*% mu + y := ((r,c,v) â v - s_o(c)) var yty = y.t %*% y var ch = chol(yty) @@ -126,31 +127,32 @@ private[math] object SSVD { var qm = ch.solveRight(y) var bt = a.t %*% qm var s_q = qm.colSums() - var s_b = bt.t %*% xi + var s_b = bt.t %*% mu // Power iterations - for (i <- 0 until q) { + for (i â 0 until q) { // Fix bt - bt -= xi cross s_q + bt -= mu cross s_q y = a %*% bt // Fix Y again. - y := ((r,c,v) => v - s_b(c)) + val st_b = s_b -=: mtm * s_q + y := ((r,c,v) â v - st_b(c)) yty = y.t %*% y ch = chol(yty) qm = ch.solveRight(y) bt = a.t %*% qm s_q = qm.colSums() - s_b = bt.t %*% xi + s_b = bt.t %*% mu } val c = s_q cross s_b // BB' computation becomes - val bbt = bt.t %*% bt - c - c.t + (s_q cross s_q) * (xi dot xi) + val bbt = bt.t %*% bt -= c -= c.t += (mtm * s_q cross s_q) val (uhat, d) = eigen(bbt) http://git-wip-us.apache.org/repos/asf/mahout/blob/7f321e08/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala b/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala index 0a0c1af..d340ed2 100644 --- a/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuite.scala @@ -28,7 +28,5 @@ import scala.math._ import org.scalatest.{Matchers, FunSuite} import org.apache.mahout.sparkbindings.test.DistributedSparkSuite -class DistributedDecompositionsSuite extends FunSuite with DistributedSparkSuite with DistributedDecompositionsSuiteBase { - - -} +class DistributedDecompositionsSuite extends FunSuite +with DistributedSparkSuite with DistributedDecompositionsSuiteBase
