http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala deleted file mode 100644 index 8ced112..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import org.apache.mahout.math._ -import drm._ -import scalabindings._ -import RLikeDrmOps._ -import RLikeOps._ -import org.apache.log4j.Logger -import math._ -import org.apache.mahout.common.RandomUtils - -/** Simple ALS factorization algotithm. To solve, use train() method. */ -private[math] object ALS { - - private val log = Logger.getLogger(ALS.getClass) - - /** - * ALS training result. <P> - * - * <code>drmU %*% drmV.t</code> is supposed to approximate the input. - * - * @param drmU U matrix - * @param drmV V matrix - * @param iterationsRMSE RMSE values afeter each of iteration performed - */ - class Result[K](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) { - def toTuple = (drmU, drmV, iterationsRMSE) - } - - /** Result class for in-core results */ - class InCoreResult(val inCoreU: Matrix, inCoreV: Matrix, val iterationsRMSE: Iterable[Double]) { - def toTuple = (inCoreU, inCoreV, iterationsRMSE) - } - - /** - * Run Distributed ALS. - * <P> - * - * Example: - * - * <pre> - * val (u,v,errors) = als(input, k).toTuple - * </pre> - * - * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations, - * whichever earlier. - * <P> - * - * @param drmA The input matrix - * @param k required rank of decomposition (number of cols in U and V results) - * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this - * value. If <=0 then we won't compute RMSE and use convergence test. - * @param lambda regularization rate - * @param maxIterations maximum iterations to run regardless of convergence - * @tparam K row key type of the input (100 is probably more than enough) - * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} - */ - def dals[K]( - drmA: DrmLike[K], - k: Int = 50, - lambda: Double = 0.0, - maxIterations: Int = 10, - convergenceThreshold: Double = 0.10 - ): Result[K] = { - - assert(convergenceThreshold < 1.0, "convergenceThreshold") - assert(maxIterations >= 1, "maxIterations") - - // Some mapblock() usage may require to know ClassTag[K] bound - implicit val ktag = drmA.keyClassTag - - val drmAt = drmA.t - - // Initialize U and V so that they are identically distributed to A or A' - var drmU = drmA.mapBlock(ncol = k) { - case (keys, block) => - val rnd = RandomUtils.getRandom() - val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01 - keys -> uBlock - } - - var drmV: DrmLike[Int] = null - var rmseIterations: List[Double] = Nil - - // ALS iterator - var stop = false - var i = 0 - while (!stop && i < maxIterations) { - - // Alternate. This is really what ALS is. - if (drmV != null) drmV.uncache() - drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint() - - drmU.uncache() - drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint() - - // Check if we are requested to do a convergence test; and do it if yes. - if (convergenceThreshold > 0) { - - val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow) - - if (i > 0) { - val rmsePrev = rmseIterations.last - val convergence = (rmsePrev - rmse) / rmsePrev - - if (convergence < 0) { - log.warn("Rmse increase of %f. Should not happen.".format(convergence)) - // I guess error growth can happen in ideal data case? - stop = true - } else if (convergence < convergenceThreshold) { - stop = true - } - } - rmseIterations :+= rmse - } - - i += 1 - } - - new Result(drmU, drmV, rmseIterations) - } - - -}
http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala deleted file mode 100644 index 389eba0..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import org.apache.mahout.logging._ -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ - -object DQR { - - private final implicit val log = getLog(DQR.getClass) - - /** - * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty - * controlled (<5000 or so). <P> - * - * It is recommended to checkpoint A since it does two passes over it. <P> - * - * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so - * their RDD should be able to zip successfully. - */ - def dqrThin[K](drmA: DrmLike[K], - checkRankDeficiency: Boolean = true, - cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): (DrmLike[K], Matrix) = { - - // Some mapBlock() calls need it - implicit val ktag = drmA.keyClassTag - - if (drmA.ncol > 5000) - warn("A is too fat. A'A must fit in memory and easily broadcasted.") - - implicit val ctx = drmA.context - - val AtA = (drmA.t %*% drmA).checkpoint(cacheHint) - val inCoreAtA = AtA.collect - - trace("A'A=\n%s\n".format(inCoreAtA)) - - val ch = chol(inCoreAtA) - val inCoreR = (ch.getL cloned) t - - trace("R=\n%s\n".format(inCoreR)) - - if (checkRankDeficiency && !ch.isPositiveDefinite) - throw new IllegalArgumentException("R is rank-deficient.") - - val bcastAtA = drmBroadcast(inCoreAtA) - - // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re- - // decompose A'A in the backend again. - - // Compute Q = A*inv(L') -- we can do it blockwise. - val Q = drmA.mapBlock() { - case (keys, block) => keys -> chol(bcastAtA).solveRight(block) - } - - Q -> inCoreR - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala deleted file mode 100644 index 2c010bb..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import org.apache.mahout.math.{Matrices, Vector} -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ -import org.apache.mahout.common.RandomUtils - -object DSPCA { - - /** - * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" - * document of the MAHOUT-817. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations (hint: use either 0 or 1) - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dspca[K](drmA: DrmLike[K], - k: Int, - p: Int = 15, - q: Int = 0, - cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): - (DrmLike[K], DrmLike[Int], Vector) = { - - // Some mapBlock() calls need it - implicit val ktag = drmA.keyClassTag - - val drmAcp = drmA.checkpoint(cacheHint) - implicit val ctx = drmAcp.context - - val m = drmAcp.nrow - val n = drmAcp.ncol - assert(k <= (m min n), "k cannot be greater than smaller of m, n.") - val pfxed = safeToNonNegInt((m min n) - k min p) - - // Actual decomposition rank - val r = k + pfxed - - // Dataset mean - val mu = drmAcp.colMeans - - val mtm = mu dot mu - - // We represent Omega by its seed. - val omegaSeed = RandomUtils.getRandom().nextInt() - val omega = Matrices.symmetricUniformView(n, r, omegaSeed) - - // This done in front in a single-threaded fashion for now. Even though it doesn't require any - // memory beyond that is required to keep xi around, it still might be parallelized to backs - // for significantly big n and r. TODO - val s_o = omega.t %*% mu - - val bcastS_o = drmBroadcast(s_o) - val bcastMu = drmBroadcast(mu) - - var drmY = drmAcp.mapBlock(ncol = r) { - case (keys, blockA) â - val s_o:Vector = bcastS_o - val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) - for (row â 0 until blockY.nrow) blockY(row, ::) -= s_o - keys â blockY - } - // Checkpoint Y - .checkpoint(cacheHint) - - var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint(cacheHint) - - var s_q = drmQ.colSums() - var bcastVarS_q = drmBroadcast(s_q) - - // This actually should be optimized as identically partitioned map-side A'B since A and Q should - // still be identically partitioned. - var drmBt = (drmAcp.t %*% drmQ).checkpoint(cacheHint) - - var s_b = (drmBt.t %*% mu).collect(::, 0) - var bcastVarS_b = drmBroadcast(s_b) - - for (i â 0 until q) { - - // These closures don't seem to live well with outside-scope vars. This doesn't record closure - // attributes correctly. So we create additional set of vals for broadcast vars to properly - // create readonly closure attributes in this very scope. - val bcastS_q = bcastVarS_q - val bcastMuInner = bcastMu - - // Fix Bt as B' -= xi cross s_q - drmBt = drmBt.mapBlock() { - case (keys, block) â - val s_q: Vector = bcastS_q - val mu: Vector = bcastMuInner - keys.zipWithIndex.foreach { - case (key, idx) â block(idx, ::) -= s_q * mu(key) - } - keys â block - } - - drmY.uncache() - drmQ.uncache() - - val bCastSt_b = drmBroadcast(s_b -=: mtm * s_q) - - drmY = (drmAcp %*% drmBt) - // Fix Y by subtracting st_b from each row of the AB' - .mapBlock() { - case (keys, block) â - val st_b: Vector = bCastSt_b - block := { (_, c, v) â v - st_b(c) } - keys â block - } - // Checkpoint Y - .checkpoint(cacheHint) - - drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint(cacheHint) - - s_q = drmQ.colSums() - bcastVarS_q = drmBroadcast(s_q) - - // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not - // identically partitioned anymore. - drmBt = (drmAcp.t %*% drmQ).checkpoint(cacheHint) - - s_b = (drmBt.t %*% mu).collect(::, 0) - bcastVarS_b = drmBroadcast(s_b) - } - - val c = s_q cross s_b - val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(cacheHint).collect -=: - c -=: c.t +=: mtm *=: (s_q cross s_q) - val (inCoreUHat, d) = eigen(inCoreBBt) - val s = d.sqrt - - // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags - // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? - val drmU = drmQ %*% inCoreUHat - val drmV = drmBt %*% (inCoreUHat %*% diagv(1 / s)) - - (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala deleted file mode 100644 index d917d11..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala +++ /dev/null @@ -1,100 +0,0 @@ -package org.apache.mahout.math.decompositions - -import org.apache.mahout.math.{Matrices, Matrix, Vector} -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.drm._ -import RLikeDrmOps._ -import org.apache.mahout.common.RandomUtils -import org.apache.mahout.logging._ - -object DSSVD { - - private final implicit val log = getLog(DSSVD.getClass) - - /** - * Distributed Stochastic Singular Value decomposition algorithm. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dssvd[K](drmA: DrmLike[K], - k: Int, - p: Int = 15, - q: Int = 0, - cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): - - (DrmLike[K], DrmLike[Int], Vector) = { - - // Some mapBlock() calls need it - implicit val ktag = drmA.keyClassTag - - val drmAcp = drmA.checkpoint(cacheHint) - - val m = drmAcp.nrow - val n = drmAcp.ncol - assert(k <= (m min n), "k cannot be greater than smaller of m, n.") - val pfxed = safeToNonNegInt((m min n) - k min p) - - // Actual decomposition rank - val r = k + pfxed - - // We represent Omega by its seed. - val omegaSeed = RandomUtils.getRandom().nextInt() - - // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and - // instantiate the Omega random matrix view in the backend instead. That way serialized closure - // is much more compact. - var drmY = drmAcp.mapBlock(ncol = r) { - case (keys, blockA) â - val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) - keys â blockY - }.checkpoint(cacheHint) - - var drmQ = dqrThin(drmY)._1 - // Checkpoint Q if last iteration - if (q == 0) drmQ = drmQ.checkpoint(cacheHint) - - trace(s"dssvd:drmQ=${drmQ.collect}.") - - // This actually should be optimized as identically partitioned map-side A'B since A and Q should - // still be identically partitioned. - var drmBt = drmAcp.t %*% drmQ - // Checkpoint B' if last iteration - if (q == 0) drmBt = drmBt.checkpoint(cacheHint) - - trace(s"dssvd:drmB'=${drmBt.collect}.") - - for (i â 0 until q) { - drmY = drmAcp %*% drmBt - drmQ = dqrThin(drmY.checkpoint(cacheHint))._1 - // Checkpoint Q if last iteration - if (i == q - 1) drmQ = drmQ.checkpoint(cacheHint) - - // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not - // identically partitioned anymore.` - drmBt = drmAcp.t %*% drmQ - // Checkpoint B' if last iteration - if (i == q - 1) drmBt = drmBt.checkpoint(cacheHint) - } - - val mxBBt:Matrix = drmBt.t %*% drmBt - - trace(s"dssvd: BB'=$mxBBt.") - - val (inCoreUHat, d) = eigen(mxBBt) - val s = d.sqrt - - // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags - // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it? - val drmU = drmQ %*% inCoreUHat - val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s)) - - (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k)) - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala deleted file mode 100644 index fba9517..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.decompositions - -import scala.math._ -import org.apache.mahout.math.{Matrices, Matrix} -import org.apache.mahout.common.RandomUtils -import org.apache.log4j.Logger -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ - -private[math] object SSVD { - - private val log = Logger.getLogger(SSVD.getClass) - - /** - * In-core SSVD algorithm. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = { - val m = a.nrow - val n = a.ncol - if (k > min(m, n)) - throw new IllegalArgumentException( - "k cannot be greater than smaller of m,n") - val pfxed = min(p, min(m, n) - k) - - // Actual decomposition rank - val r = k + pfxed - - val rnd = RandomUtils.getRandom - val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) - - var y = a %*% omega - var yty = y.t %*% y - val at = a.t - var ch = chol(yty) - assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD") - var bt = ch.solveRight(at %*% y) - - // Power iterations - for (i â 0 until q) { - y = a %*% bt - yty = y.t %*% y - ch = chol(yty) - bt = ch.solveRight(at %*% y) - } - - val bbt = bt.t %*% bt - val (uhat, d) = eigen(bbt) - - val s = d.sqrt - val u = ch.solveRight(y) %*% uhat - val v = bt %*% (uhat %*% diagv(1 /: s)) - - (u(::, 0 until k), v(::, 0 until k), s(0 until k)) - } - - /** - * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This - * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed - * to save some memory for sparse inputs by removing direct mean subtraction.<P> - * - * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>. - * If retaining distances and orignal scaled variances not that important, the normalized PCA space - * is just U. - * - * Important: data points are considered to be rows. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = { - val m = a.nrow - val n = a.ncol - if (k > min(m, n)) - throw new IllegalArgumentException( - "k cannot be greater than smaller of m,n") - val pfxed = min(p, min(m, n) - k) - - // Actual decomposition rank - val r = k + pfxed - - val rnd = RandomUtils.getRandom - val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt) - - // Dataset mean - val mu = a.colMeans() - val mtm = mu dot mu - - if (log.isDebugEnabled) log.debug("xi=%s".format(mu)) - - var y = a %*% omega - - // Fixing y - val s_o = omega.t %*% mu - y := ((r,c,v) â v - s_o(c)) - - var yty = y.t %*% y - var ch = chol(yty) -// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD") - - // This is implicit Q of QR(Y) - var qm = ch.solveRight(y) - var bt = a.t %*% qm - var s_q = qm.colSums() - var s_b = bt.t %*% mu - - // Power iterations - for (i â 0 until q) { - - // Fix bt - bt -= mu cross s_q - - y = a %*% bt - - // Fix Y again. - val st_b = s_b -=: mtm * s_q - y := ((r,c,v) â v - st_b(c)) - - yty = y.t %*% y - ch = chol(yty) - qm = ch.solveRight(y) - bt = a.t %*% qm - s_q = qm.colSums() - s_b = bt.t %*% mu - } - - val c = s_q cross s_b - - // BB' computation becomes - val bbt = bt.t %*% bt -= c -= c.t += (mtm * s_q cross s_q) - - val (uhat, d) = eigen(bbt) - - val s = d.sqrt - val u = qm %*% uhat - val v = bt %*% (uhat %*%: diagv(1 /: s)) - - (u(::, 0 until k), v(::, 0 until k), s(0 until k)) - - } - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala deleted file mode 100644 index a7b829f..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike - -/** - * This package holds all decomposition and factorization-like methods, all that we were able to make - * distributed engine-independent so far, anyway. - */ -package object decompositions { - - // ================ In-core decompositions =================== - - /** - * In-core SSVD algorithm. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q) - - /** - * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This - * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed - * to save some memory for sparse inputs by removing direct mean subtraction.<P> - * - * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>. - * If retaining distances and orignal scaled variances not that important, the normalized PCA space - * is just U. - * - * Important: data points are considered to be rows. - * - * @param a input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s) - */ - def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = - SSVD.spca(a = a, k = k, p = p, q = q) - - // ============== Distributed decompositions =================== - - /** - * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty - * controlled (<5000 or so). <P> - * - * It is recommended to checkpoint A since it does two passes over it. <P> - * - * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so - * their RDD should be able to zip successfully. - */ - def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = - DQR.dqrThin(drmA, checkRankDeficiency) - - /** - * Distributed Stochastic Singular Value decomposition algorithm. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(drmA, k, p, q) - - /** - * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf" - * document of the MAHOUT-817. - * - * @param drmA input matrix A - * @param k request SSVD rank - * @param p oversampling parameter - * @param q number of power iterations (hint: use either 0 or 1) - * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them - * e.g. save them to hdfs in order to trigger their computation. - */ - def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): - (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(drmA, k, p, q) - - /** Result for distributed ALS-type two-component factorization algorithms */ - type FactorizationResult[K] = ALS.Result[K] - - /** Result for distributed ALS-type two-component factorization algorithms, in-core matrices */ - type FactorizationResultInCore = ALS.InCoreResult - - /** - * Run ALS. - * <P> - * - * Example: - * - * <pre> - * val (u,v,errors) = als(input, k).toTuple - * </pre> - * - * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations, - * whichever earlier. - * <P> - * - * @param drmA The input matrix - * @param k required rank of decomposition (number of cols in U and V results) - * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this - * value. If <=0 then we won't compute RMSE and use convergence test. - * @param lambda regularization rate - * @param maxIterations maximum iterations to run regardless of convergence - * @tparam K row key type of the input (100 is probably more than enough) - * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} - */ - def dals[K: ClassTag]( - drmA: DrmLike[K], - k: Int = 50, - lambda: Double = 0.0, - maxIterations: Int = 10, - convergenceThreshold: Double = 0.10 - ): FactorizationResult[K] = - ALS.dals(drmA, k, lambda, maxIterations, convergenceThreshold) - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala deleted file mode 100644 index b86e286..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -/** Broadcast variable abstraction */ -trait BCast[T] extends java.io.Closeable { - def value:T - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala deleted file mode 100644 index 3755f31..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -object CacheHint extends Enumeration { - - type CacheHint = Value - - val NONE, - DISK_ONLY, - DISK_ONLY_2, - MEMORY_ONLY, - MEMORY_ONLY_2, - MEMORY_ONLY_SER, - MEMORY_ONLY_SER_2, - MEMORY_AND_DISK, - MEMORY_AND_DISK_2, - MEMORY_AND_DISK_SER, - MEMORY_AND_DISK_SER_2 = Value - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala deleted file mode 100644 index 31f8097..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm.CacheHint.CacheHint - -/** - * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be - * therefore collected or saved. - * - * @tparam K matrix key type (e.g. the keys of sequence files once persisted) - */ -trait CheckpointedDrm[K] extends DrmLike[K] { - - def collect: Matrix - - def dfsWrite(path: String) - - val cacheHint: CacheHint - - /** If this checkpoint is already declared cached, uncache. */ - def uncache(): this.type - - /** changes the number of rows without touching the underlying data */ - def newRowCardinality(n: Int): CheckpointedDrm[K] - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala deleted file mode 100644 index 37cd981..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import org.apache.mahout.math._ - -import org.apache.mahout.math.scalabindings.RLikeOps._ - -/** - * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to - * the DRMBase once they stabilize. - * - */ -class CheckpointedOps[K](val drm: CheckpointedDrm[K]) { - - - /** Column sums. At this point this runs on checkpoint and collects in-core vector. */ - def colSums(): Vector = drm.context.colSums(drm) - - /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */ - def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm) - - /** Column Means */ - def colMeans(): Vector = drm.context.colMeans(drm) - - /** Optional engine-specific all reduce tensor operation. */ - def allreduceBlock(bmf: BlockMapFunc2[K], rf: BlockReduceFunc = _ += _): Matrix = - - drm.context.allreduceBlock(drm, bmf, rf) - - /** Second norm */ - def norm():Double = drm.context.norm(drm) -} - http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala deleted file mode 100644 index e1833d8..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import java.io.Closeable - -/** Distributed context (a.k.a. distributed session handle) */ -trait DistributedContext extends Closeable { - - val engine: DistributedEngine - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala deleted file mode 100644 index c27e8dd..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import org.apache.mahout.math.indexeddataset._ - -import logical._ -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import DistributedEngine._ -import org.apache.log4j.Logger - -import scala.reflect.ClassTag - -/** Abstraction of optimizer/distributed engine */ -trait DistributedEngine { - - /** - * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may - * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce - * per se. - * <P> - * - * A particular physical engine implementation may choose to either use the default rewrites or - * build its own rewriting rules. - * <P> - */ - def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action))) - - /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */ - def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] - - /** Engine-specific colSums implementation based on a checkpoint. */ - def colSums[K](drm: CheckpointedDrm[K]): Vector - - /** Optional engine-specific all reduce tensor operation. */ - def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix - - /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ - def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector - - /** Engine-specific colMeans implementation based on a checkpoint. */ - def colMeans[K](drm: CheckpointedDrm[K]): Vector - - def norm[K](drm: CheckpointedDrm[K]): Double - - /** Broadcast support */ - def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] - - /** Broadcast support */ - def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] - - /** - * Load DRM from hdfs (as in Mahout DRM format). - * <P/> - * @param path The DFS path to load from - * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). - */ - def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] - - /** Parallelize in-core matrix as the backend engine distributed matrix, using row ordinal indices as data set keys. */ - def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)(implicit sc: DistributedContext): - CheckpointedDrm[Int] - - /** Parallelize in-core matrix as the backend engine distributed matrix, using row labels as a data set keys. */ - def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)(implicit sc: DistributedContext): - CheckpointedDrm[String] - - /** This creates an empty DRM with specified number of partitions and cardinality. */ - def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)(implicit sc: DistributedContext): - CheckpointedDrm[Int] - - /** Creates empty DRM with non-trivial height */ - def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)(implicit sc: DistributedContext): - CheckpointedDrm[Long] - - /** - * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys - * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. - */ - def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) - - /** - * (Optional) Sampling operation. Consistent with Spark semantics of the same. - * @param drmX - * @param fraction - * @param replacement - * @tparam K - * @return - */ - def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] - - def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix - - /** - * Load IndexedDataset from text delimited format. - * @param src comma delimited URIs to read from - * @param schema defines format of file(s) - */ - def indexedDatasetDFSRead(src: String, - schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: Option[BiDictionary] = None) - (implicit sc: DistributedContext): - IndexedDataset - - /** - * Load IndexedDataset from text delimited format, one element per line - * @param src comma delimited URIs to read from - * @param schema defines format of file(s) - */ - def indexedDatasetDFSReadElements(src: String, - schema: Schema = DefaultIndexedDatasetElementReadSchema, - existingRowIDs: Option[BiDictionary] = None) - (implicit sc: DistributedContext): - IndexedDataset - -} - -object DistributedEngine { - - private val log = Logger.getLogger(DistributedEngine.getClass) - - /** This is mostly multiplication operations rewrites */ - private def pass1[K](action: DrmLike[K]): DrmLike[K] = { - - action match { - - // Logical but previously had checkpoint attached to it already that has some caching policy to it - case cpa: CheckpointAction[K] if cpa.cp.exists(_.cacheHint != CacheHint.NONE) â cpa.cp.get - - // self element-wise rewrite - case OpAewB(a, b, op) if a == b => { - op match { - case "*" â OpAewUnaryFunc(pass1(a), (x) â x * x) - case "/" â OpAewUnaryFunc(pass1(a), (x) â x / x) - // Self "+" and "-" don't make a lot of sense, but we do include it for completeness. - case "+" â OpAewUnaryFunc(pass1(a), 2.0 * _) - case "-" â OpAewUnaryFunc(pass1(a), (_) â 0.0) - case _ â - require(false, s"Unsupported operator $op") - null - } - } - case OpAB(OpAt(a), b) if a == b â OpAtA(pass1(a)) - case OpABAnyKey(OpAtAnyKey(a), b) if a == b â OpAtA(pass1(a)) - - // A small rule change: Now that we have removed ClassTag at the %*% operation, it doesn't - // match b[Int] case automatically any longer. So, we need to check and rewrite it dynamically - // and re-run pass1 again on the obtained tree. - case OpABAnyKey(a, b) if b.keyClassTag == ClassTag.Int â pass1(OpAB(a, b.asInstanceOf[DrmLike[Int]])) - case OpAtAnyKey(a) if a.keyClassTag == ClassTag.Int â pass1(OpAt(a.asInstanceOf[DrmLike[Int]])) - - // For now, rewrite left-multiply via transpositions, i.e. - // inCoreA %*% B = (B' %*% inCoreA')' - case op@OpTimesLeftMatrix(a, b) â - OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) - - // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments - case op@OpRbind(a, b) if op.keyClassTag == ClassTag.Int â - - // Make sure closure sees only local vals, not attributes. We need to do these ugly casts - // around because compiler could not infer that K is the same as Int, based on if() above. - val ma = safeToNonNegInt(a.nrow) - val bAdjusted = new OpMapBlock[Int, Int](A = pass1(b.asInstanceOf[DrmLike[Int]]), bmf = { - case (keys, block) â keys.map(_ + ma) â block - }, identicallyPartitioned = false) - val aAdjusted = a.asInstanceOf[DrmLike[Int]] - OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]] - - // Stop at checkpoints - case cd: CheckpointedDrm[_] â action - - // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] â - uop.A = pass1(uop.A) - uop - - case bop: AbstractBinaryOp[_, _, K] â - bop.A = pass1(bop.A) - bop.B = pass1(bop.B) - bop - } - } - - /** This would remove stuff like A.t.t that previous step may have created */ - private def pass2[K](action: DrmLike[K]): DrmLike[K] = { - action match { - - // Fusion of unary funcs into single, like 1 + x * x. - // Since we repeating the pass over self after rewrite, we dont' need to descend into arguments - // recursively here. - case op1@OpAewUnaryFunc(op2@OpAewUnaryFunc(a, _, _), _, _) â - pass2(OpAewUnaryFuncFusion(a, op1 :: op2 :: Nil)) - - // Fusion one step further, like 1 + 2 * x * x. All should be rewritten as one UnaryFuncFusion. - // Since we repeating the pass over self after rewrite, we dont' need to descend into arguments - // recursively here. - case op@OpAewUnaryFuncFusion(op2@OpAewUnaryFunc(a, _, _), _) â - pass2(OpAewUnaryFuncFusion(a, op.ff :+ op2)) - - // A.t.t => A - case OpAt(top@OpAt(a)) â pass2(a) - - // Stop at checkpoints - case cd: CheckpointedDrm[_] â action - - // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] â - uop.A = pass2(uop.A) - uop - case bop: AbstractBinaryOp[_, _, K] â - bop.A = pass2(bop.A) - bop.B = pass2(bop.B) - bop - } - } - - /** Some further rewrites that are conditioned on A.t.t removal */ - private def pass3[K](action: DrmLike[K]): DrmLike[K] = { - action match { - - // matrix products. - case OpAB(a, OpAt(b)) â OpABt(pass3(a), pass3(b)) - - // AtB cases that make sense. - case OpAB(OpAt(a), b) if a.partitioningTag == b.partitioningTag â OpAtB(pass3(a), pass3(b)) - case OpABAnyKey(OpAtAnyKey(a), b) â OpAtB(pass3(a), pass3(b)) - - // Need some cost to choose between the following. - - case OpAB(OpAt(a), b) â OpAtB(pass3(a), pass3(b)) - // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a))) - case OpAB(a, b) â OpABt(pass3(a), OpAt(pass3(b))) - - // Rewrite A'x - case op@OpAx(op1@OpAt(a), x) â OpAtx(pass3(a), x) - - // Stop at checkpoints - case cd: CheckpointedDrm[_] â action - - // For everything else we just pass-thru the operator arguments to optimizer - case uop: AbstractUnaryOp[_, K] â - uop.A = pass3(uop.A) - uop - case bop: AbstractBinaryOp[_, _, K] â - bop.A = pass3(bop.A) - bop.B = pass3(bop.B) - bop - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala deleted file mode 100644 index de03776..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.drm.logical.OpCbindScalar - -import scala.reflect.ClassTag - -class DrmDoubleScalarOps(val x:Double) extends AnyVal{ - - def +[K:ClassTag](that:DrmLike[K]) = that + x - - def *[K:ClassTag](that:DrmLike[K]) = that * x - - def -[K:ClassTag](that:DrmLike[K]) = x -: that - - def /[K:ClassTag](that:DrmLike[K]) = x /: that - - def cbind[K: ClassTag](that: DrmLike[K]) = OpCbindScalar(A = that, x = x, leftBind = true) - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala deleted file mode 100644 index 23f5fc6..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag - -/** - * - * Basic DRM trait. - * - * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in - * this package. Spark backing is already implied. - * - */ -trait DrmLike[K] { - - protected[mahout] def partitioningTag: Long - - protected[mahout] def canHaveMissingRows: Boolean - - /** - * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm. - * DistributedEngine]]. - */ - val context:DistributedContext - - /** R-like syntax for number of rows. */ - def nrow: Long - - /** R-like syntax for number of columns */ - def ncol: Int - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - def keyClassTag: ClassTag[K] - - /** - * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer - * and writing down Spark graph lineage since last checkpointed DRM. - */ - def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K] - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala deleted file mode 100644 index 43b4f56..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.drm.logical.{OpAewUnaryFunc, OpPar, OpMapBlock, OpRowRange} - -/** Common Drm ops */ -class DrmLikeOps[K](protected[drm] val drm: DrmLike[K]) { - - /** - * Parallelism adjustments. <P/> - * - * Change only one of parameters from default value to choose new parallelism adjustment strategy. - * <P/> - * - * E.g. use - * <pre> - * drmA.par(auto = true) - * </pre> - * to use automatic parallelism adjustment. - * <P/> - * - * Parallelism here in API is fairly abstract concept, and actual value interpretation is left for - * a particular backend strategy. However, it is usually equivalent to number of map tasks or data - * splits. - * <P/> - * - * @param min If changed from default, ensures the product has at least that much parallelism. - * @param exact if changed from default, ensures the pipeline product has exactly that much - * parallelism. - * @param auto If changed from default, engine-specific automatic parallelism adjustment strategy - * is applied. - */ - def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = { - require(min > 0 || exact > 0 || auto, "Invalid argument") - OpPar(drm, minSplits = min, exactSplits = exact) - } - - /** - * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block - * matrices; or they could be completely new matrices with new keyset. In the latter case, output - * matrix width must be specified with <code>ncol</code> parameter.<P> - * - * New block heights must be of the same height as the original geometry.<P> - * - * @param ncol new matrix' width (only needed if width changes). - * @param bmf - * @tparam R - * @return - */ - def mapBlock[R: ClassTag](ncol: Int = -1, identicallyPartitioned: Boolean = true) - (bmf: BlockMapFunc[K, R]): DrmLike[R] = - new OpMapBlock[K, R]( - A = drm, - bmf = bmf, - _ncol = ncol, - identicallyPartitioned = identicallyPartitioned - ) - - /** - * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P> - * - * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P> - * - * Row range is currently unsupported except for the all-range. When it will be fully supported, - * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications. - * - * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key. - * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range. - */ - def apply(rowRange: Range, colRange: Range): DrmLike[K] = { - - import RLikeDrmOps._ - import RLikeOps._ - - implicit val ktag = drm.keyClassTag - - val rowSrc: DrmLike[K] = if (rowRange != ::) { - - if (ClassTag.Int == ktag) { - - assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range") - val intKeyed = drm.asInstanceOf[DrmLike[Int]] - - new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]] - - } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.") - - } else drm - - if (colRange != ::) { - - assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range") - - // Use mapBlock operator to do in-core subranging. - rowSrc.mapBlock(ncol = colRange.length)({ - case (keys, block) => keys -> block(::, colRange) - }) - - } else rowSrc - } - - /** - * Apply a function element-wise. - * - * @param f element-wise function - * @param evalZeros Do we have to process zero elements? true, false, auto: if auto, we will test - * the supplied function for `f(0) != 0`, and depending on the result, will - * decide if we want evaluation for zero elements. WARNING: the AUTO setting - * may not always work correctly for functions that are meant to run in a specific - * backend context, or non-deterministic functions, such as {-1,0,1} random - * generators. - * @return new DRM with the element-wise function applied. - */ - def apply(f: Double â Double, evalZeros: AutoBooleanEnum.T = AutoBooleanEnum.AUTO) = { - val ezeros = evalZeros match { - case AutoBooleanEnum.TRUE â true - case AutoBooleanEnum.FALSE â false - case AutoBooleanEnum.AUTO â f(0) != 0 - } - new OpAewUnaryFunc[K](drm, f, ezeros) - } -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala deleted file mode 100644 index 8bea741..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm - -import scala.reflect.ClassTag -import collection._ -import JavaConversions._ -import org.apache.mahout.math.{Vector, Matrix} -import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ - -class RLikeDrmOps[K](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { - - import RLikeDrmOps._ - import org.apache.mahout.math.scalabindings._ - - def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+") - - def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-") - - def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*") - - def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/") - - def +(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ + that, evalZeros = true) - - def +:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that + _, evalZeros = true) - - def -(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ - that, evalZeros = true) - - def -:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that - _, evalZeros = true) - - def *(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ * that) - - def *:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that * _) - - def ^(that: Double): DrmLike[K] = that match { - // Special handling of x ^2 and x ^ 0.5: we want consistent handling of x ^ 2 and x * x since - // pow(x,2) function return results different from x * x; but much of the code uses this - // interchangeably. Not having this done will create things like NaN entries on main diagonal - // of a distance matrix. - case 2.0 â OpAewUnaryFunc[K](A = this, f = x â x * x) - case 0.5 â OpAewUnaryFunc[K](A = this, f = math.sqrt _) - case _ â OpAewUnaryFunc[K](A = this, f = math.pow(_, that)) - } - - def /(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ / that, evalZeros = that == 0.0) - - def /:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that / _, evalZeros = true) - - def :%*%[B](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B,K](A = this.drm, B=that) - - def %*%[B](that: DrmLike[B]): DrmLike[K] = this :%*% that - - def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that) - - def %*%(that: Matrix): DrmLike[K] = this :%*% that - - def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that) - - def %*%(that: Vector): DrmLike[K] = :%*%(that) - - def t: DrmLike[Int] = OpAtAnyKey(A = drm) - - def cbind(that: DrmLike[K]): DrmLike[K] = OpCbind(A = this.drm, B = that) - - def cbind(that: Double): DrmLike[K] = OpCbindScalar(A = this.drm, x = that, leftBind = false) - - def rbind(that: DrmLike[K]): DrmLike[K] = OpRbind(A = this.drm, B = that) - - /** - * `rowSums` method for non-int keyed matrices. - * - * Slight problem here is the limitation of in-memory representation of Colt's Matrix, which can - * only have String row labels. Therefore, internally we do ".toString()" call on each key object, - * and then put it into [[Matrix]] row label bindings, at which point they are coerced to be Strings. - * - * This is obviously a suboptimal behavior, so as TODO we have here future enhancements of `collect'. - * - * @return map of row keys into row sums, front-end collected. - */ - def rowSumsMap(): Map[String, Double] = { - - implicit val ktag = drm.keyClassTag - - val m = drm.mapBlock(ncol = 1) { case (keys, block) => - keys -> dense(block.rowSums).t - }.collect - m.getRowLabelBindings.map { case (key, idx) => key -> m(idx, 0)} - } -} - -class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { - - import org.apache.mahout.math._ - import scalabindings._ - import RLikeDrmOps._ - - override def t: DrmLike[Int] = OpAt(A = drm) - - def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm) - - def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm) - - /** Row sums. This is of course applicable to Int-keyed distributed matrices only. */ - def rowSums(): Vector = { - drm.mapBlock(ncol = 1) { case (keys, block) => - // Collect block-wise rowsums and output them as one-column matrix. - keys -> dense(block.rowSums).t - } - .collect(::, 0) - } - - /** Counts the non-zeros elements in each row returning a vector of the counts */ - def numNonZeroElementsPerRow(): Vector = { - drm.mapBlock(ncol = 1) { case (keys, block) => - // Collect block-wise row non-zero counts and output them as a one-column matrix. - keys -> dense(block.numNonZeroElementsPerRow).t - } - .collect(::, 0) - } - - /** Row means */ - def rowMeans(): Vector = { - drm.mapBlock(ncol = 1) { case (keys, block) => - // Collect block-wise row means and output them as one-column matrix. - keys -> dense(block.rowMeans).t - } - .collect(::, 0) - } - - /** Return diagonal vector */ - def diagv: Vector = { - require(drm.ncol == drm.nrow, "Must be square to extract diagonal") - drm.mapBlock(ncol = 1) { case (keys, block) => - keys -> dense(for (r <- block.view) yield r(keys(r.index))).t - } - .collect(::, 0) - } - -} - -object RLikeDrmOps { - - implicit def double2ScalarOps(x: Double) = new DrmDoubleScalarOps(x) - - implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm) - - implicit def drm2RLikeOps[K](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm) - - implicit def rlikeOps2Drm[K](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm - - implicit def ops2Drm[K](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm - - implicit def drm2cpops[K](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm) -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala deleted file mode 100644 index ba41657..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import org.apache.mahout.math.drm.{DistributedContext, DrmLike} - -/** - * Any logical binary operator (such as A + B). - * <P/> - * - * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence, - * it also inherits CheckpointAction. - * <P/> - * - * @tparam A LHS key type - * @tparam B RHS key type - * @tparam K result key type - */ -abstract class AbstractBinaryOp[A, B, K] - extends CheckpointAction[K] with DrmLike[K] { - - protected[drm] var A: DrmLike[A] - - protected[drm] var B: DrmLike[B] - - lazy val context: DistributedContext = A.context - - protected[mahout] def canHaveMissingRows: Boolean = false -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala deleted file mode 100644 index 6a70aec..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import org.apache.mahout.math.drm.{DistributedContext, DrmLike} - -/** Abstract unary operator */ -abstract class AbstractUnaryOp[A, K] - extends CheckpointAction[K] with DrmLike[K] { - - protected[mahout] var A: DrmLike[A] - - lazy val context: DistributedContext = A.context - - override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala deleted file mode 100644 index aa1d8bc..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.util.Random -import org.apache.mahout.math.drm._ - -/** Implementation of distributed expression checkpoint and optimizer. */ -abstract class CheckpointAction[K] extends DrmLike[K] { - - protected[mahout] lazy val partitioningTag: Long = Random.nextLong() - - private[mahout] var cp:Option[CheckpointedDrm[K]] = None - - def isIdenticallyPartitioned(other:DrmLike[_]) = - partitioningTag!= 0L && partitioningTag == other.partitioningTag - - /** - * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer - * and writing down Spark graph lineage since last checkpointed DRM. - */ - def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match { - case None => - implicit val cpTag = this.keyClassTag - val plan = context.optimizerRewrite(this) - val physPlan = context.toPhysical(plan, cacheHint) - cp = Some(physPlan) - physPlan - case Some(cp) => cp - } - -} - http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala deleted file mode 100644 index e5316a0..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike - -/** Logical AB */ -case class OpAB[K]( - override var A: DrmLike[K], - override var B: DrmLike[Int]) - extends AbstractBinaryOp[K, Int, K] { - - assert(A.ncol == B.nrow, "Incompatible operand geometry") - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - override def keyClassTag: ClassTag[K] = A.keyClassTag - - /** R-like syntax for number of rows. */ - def nrow: Long = A.nrow - - /** R-like syntax for number of columns */ - def ncol: Int = B.ncol - - /** Non-zero element count */ - def nNonZero: Long = - // TODO: for purposes of cost calculation, approximate based on operands - throw new UnsupportedOperationException -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala deleted file mode 100644 index 8437cdd..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike - -/** Logical AB */ -case class OpABAnyKey[B, K ]( - override var A: DrmLike[K], - override var B: DrmLike[B]) - extends AbstractBinaryOp[K, B, K] { - - assert(A.ncol == B.nrow, "Incompatible operand geometry") - - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - override def keyClassTag: ClassTag[K] = A.keyClassTag - - /** R-like syntax for number of rows. */ - def nrow: Long = A.nrow - - /** R-like syntax for number of columns */ - def ncol: Int = B.ncol - - /** Non-zero element count */ - def nNonZero: Long = - // TODO: for purposes of cost calculation, approximate based on operands - throw new UnsupportedOperationException -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala deleted file mode 100644 index 63bd7e1..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm._ - -/** Logical AB' */ -case class OpABt[K]( - override var A: DrmLike[K], - override var B: DrmLike[Int]) - extends AbstractBinaryOp[K,Int,K] { - - assert(A.ncol == B.ncol, "Incompatible operand geometry") - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - override lazy val keyClassTag: ClassTag[K] = A.keyClassTag - - /** R-like syntax for number of rows. */ - def nrow: Long = A.nrow - - /** R-like syntax for number of columns */ - def ncol: Int = safeToNonNegInt(B.nrow) - - /** Non-zero element count */ - def nNonZero: Long = - // TODO: for purposes of cost calculation, approximate based on operands - throw new UnsupportedOperationException - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala deleted file mode 100644 index 4bb83d0..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike -import scala.util.Random - -/** DRM elementwise operator */ -case class OpAewB[K]( - override var A: DrmLike[K], - override var B: DrmLike[K], - val op: String - ) extends AbstractBinaryOp[K, K, K] { - - - assert(A.ncol == B.ncol, "arguments must have same number of columns") - assert(A.nrow == B.nrow, "arguments must have same number of rows") - assert(A.keyClassTag == B.keyClassTag, "Arguments of elementwise operators must have the same row key") - - override protected[mahout] lazy val partitioningTag: Long = - if (A.partitioningTag == B.partitioningTag) A.partitioningTag - else Random.nextLong() - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - override def keyClassTag: ClassTag[K] = A.keyClassTag - - /** R-like syntax for number of rows. */ - def nrow: Long = A.nrow - - /** R-like syntax for number of columns */ - def ncol: Int = A.ncol - -} http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala deleted file mode 100644 index 4f08686..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike -import scala.util.Random - -/** - * Operator denoting expressions like 5.0 - A or A * 5.6 - * - * @deprecated use [[OpAewUnaryFunc]] instead - */ -case class OpAewScalar[K]( - override var A: DrmLike[K], - val scalar: Double, - val op: String - ) extends AbstractUnaryOp[K,K] { - - override protected[mahout] lazy val partitioningTag: Long = - if (A.canHaveMissingRows) - Random.nextLong() - else A.partitioningTag - - /** Stuff like `A +1` is always supposed to fix this */ - override protected[mahout] lazy val canHaveMissingRows: Boolean = false - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - override def keyClassTag: ClassTag[K] = A.keyClassTag - - /** R-like syntax for number of rows. */ - def nrow: Long = A.nrow - - /** R-like syntax for number of columns */ - def ncol: Int = A.ncol - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/99a5358f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala deleted file mode 100644 index 0607686..0000000 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.mahout.math.drm.logical - -import scala.reflect.ClassTag -import org.apache.mahout.math.drm.DrmLike -import scala.util.Random - -case class OpAewUnaryFunc[K]( - override var A: DrmLike[K], - val f: (Double) => Double, - val evalZeros:Boolean = false - ) extends AbstractUnaryOp[K,K] with TEwFunc { - - override protected[mahout] lazy val partitioningTag: Long = - if (A.canHaveMissingRows) - Random.nextLong() - else A.partitioningTag - - /** Stuff like `A +1` is always supposed to fix this */ - override protected[mahout] lazy val canHaveMissingRows: Boolean = false - - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - override lazy val keyClassTag: ClassTag[K] = A.keyClassTag - - /** R-like syntax for number of rows. */ - def nrow: Long = A.nrow - - /** R-like syntax for number of columns */ - def ncol: Int = A.ncol -} -
