Repository: mahout Updated Branches: refs/heads/master fa9aeaccc -> 771339c82
MAHOUT-1922 Propagate cacheHint in dspca closes apache/mahout#275 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/771339c8 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/771339c8 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/771339c8 Branch: refs/heads/master Commit: 771339c82c36929d5b331215fbe792626de373af Parents: fa9aeac Author: rawkintrevo <[email protected]> Authored: Mon Feb 6 21:59:21 2017 -0600 Committer: rawkintrevo <[email protected]> Committed: Mon Feb 6 21:59:21 2017 -0600 ---------------------------------------------------------------------- .../mahout/math/decompositions/DSPCA.scala | 22 ++++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/771339c8/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala index 78cfb8b..2c010bb 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala @@ -37,13 +37,17 @@ object DSPCA { * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dspca[K](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + def dspca[K](drmA: DrmLike[K], + k: Int, + p: Int = 15, + q: Int = 0, + cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): (DrmLike[K], DrmLike[Int], Vector) = { // Some mapBlock() calls need it implicit val ktag = drmA.keyClassTag - val drmAcp = drmA.checkpoint() + val drmAcp = drmA.checkpoint(cacheHint) implicit val ctx = drmAcp.context val m = drmAcp.nrow @@ -79,16 +83,16 @@ object DSPCA { keys â blockY } // Checkpoint Y - .checkpoint() + .checkpoint(cacheHint) - var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() + var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint(cacheHint) var s_q = drmQ.colSums() var bcastVarS_q = drmBroadcast(s_q) // This actually should be optimized as identically partitioned map-side A'B since A and Q should // still be identically partitioned. - var drmBt = (drmAcp.t %*% drmQ).checkpoint() + var drmBt = (drmAcp.t %*% drmQ).checkpoint(cacheHint) var s_b = (drmBt.t %*% mu).collect(::, 0) var bcastVarS_b = drmBroadcast(s_b) @@ -126,23 +130,23 @@ object DSPCA { keys â block } // Checkpoint Y - .checkpoint() + .checkpoint(cacheHint) - drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint() + drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint(cacheHint) s_q = drmQ.colSums() bcastVarS_q = drmBroadcast(s_q) // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not // identically partitioned anymore. - drmBt = (drmAcp.t %*% drmQ).checkpoint() + drmBt = (drmAcp.t %*% drmQ).checkpoint(cacheHint) s_b = (drmBt.t %*% mu).collect(::, 0) bcastVarS_b = drmBroadcast(s_b) } val c = s_q cross s_b - val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -=: + val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(cacheHint).collect -=: c -=: c.t +=: mtm *=: (s_q cross s_q) val (inCoreUHat, d) = eigen(inCoreBBt) val s = d.sqrt
