MAHOUT-1800: Pare down Casstag overuse closes apache/mahout#183
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/6919fd9f Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/6919fd9f Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/6919fd9f Branch: refs/heads/master Commit: 6919fd9febe1585d15e78e51aabcad8fa29235f3 Parents: 57317a5 Author: Andrew Palumbo <[email protected]> Authored: Mon Mar 7 23:10:37 2016 -0500 Committer: Andrew Palumbo <[email protected]> Committed: Mon Mar 7 23:35:37 2016 -0500 ---------------------------------------------------------------------- .../apache/mahout/h2obindings/H2OEngine.scala | 54 +++--- .../classifier/naivebayes/NaiveBayes.scala | 9 +- .../apache/mahout/math/decompositions/ALS.scala | 7 +- .../apache/mahout/math/decompositions/DQR.scala | 5 +- .../mahout/math/decompositions/DSPCA.scala | 5 +- .../mahout/math/decompositions/DSSVD.scala | 11 +- .../mahout/math/drm/CheckpointedDrm.scala | 7 - .../mahout/math/drm/CheckpointedOps.scala | 2 +- .../mahout/math/drm/DistributedEngine.scala | 58 ++++--- .../org/apache/mahout/math/drm/DrmLike.scala | 8 + .../org/apache/mahout/math/drm/DrmLikeOps.scala | 6 +- .../apache/mahout/math/drm/RLikeDrmOps.scala | 19 ++- .../math/drm/logical/AbstractBinaryOp.scala | 39 ++--- .../math/drm/logical/AbstractUnaryOp.scala | 6 +- .../math/drm/logical/CheckpointAction.scala | 3 +- .../apache/mahout/math/drm/logical/OpAB.scala | 8 +- .../mahout/math/drm/logical/OpABAnyKey.scala | 9 +- .../apache/mahout/math/drm/logical/OpABt.scala | 8 +- .../apache/mahout/math/drm/logical/OpAewB.scala | 10 +- .../mahout/math/drm/logical/OpAewScalar.scala | 8 +- .../math/drm/logical/OpAewUnaryFunc.scala | 8 +- .../math/drm/logical/OpAewUnaryFuncFusion.scala | 8 +- .../apache/mahout/math/drm/logical/OpAt.scala | 8 + .../apache/mahout/math/drm/logical/OpAtA.scala | 8 +- .../mahout/math/drm/logical/OpAtAnyKey.scala | 8 +- .../apache/mahout/math/drm/logical/OpAtB.scala | 8 +- .../apache/mahout/math/drm/logical/OpAtx.scala | 8 + .../apache/mahout/math/drm/logical/OpAx.scala | 8 +- .../mahout/math/drm/logical/OpCbind.scala | 9 +- .../mahout/math/drm/logical/OpCbindScalar.scala | 8 +- .../mahout/math/drm/logical/OpMapBlock.scala | 13 +- .../apache/mahout/math/drm/logical/OpPar.scala | 8 +- .../mahout/math/drm/logical/OpRbind.scala | 9 +- .../mahout/math/drm/logical/OpRowRange.scala | 8 + .../math/drm/logical/OpTimesLeftMatrix.scala | 8 + .../math/drm/logical/OpTimesRightMatrix.scala | 8 +- .../org/apache/mahout/math/drm/package.scala | 28 +-- .../mahout/math/drm/DrmLikeOpsSuiteBase.scala | 20 ++- .../classifier/naivebayes/SparkNaiveBayes.scala | 2 +- .../apache/mahout/drivers/TrainNBDriver.scala | 4 +- .../mahout/sparkbindings/SparkEngine.scala | 171 ++++++++++--------- .../apache/mahout/sparkbindings/blas/ABt.scala | 4 +- .../apache/mahout/sparkbindings/blas/AewB.scala | 36 ++-- .../mahout/sparkbindings/blas/AinCoreB.scala | 5 +- .../apache/mahout/sparkbindings/blas/Ax.scala | 3 +- .../mahout/sparkbindings/blas/CbindAB.scala | 7 +- .../mahout/sparkbindings/blas/DrmRddOps.scala | 2 +- .../mahout/sparkbindings/blas/MapBlock.scala | 7 +- .../apache/mahout/sparkbindings/blas/Par.scala | 3 +- .../mahout/sparkbindings/blas/RbindAB.scala | 4 +- .../mahout/sparkbindings/blas/package.scala | 4 +- .../drm/CheckpointedDrmSparkOps.scala | 5 +- .../mahout/sparkbindings/drm/package.scala | 8 +- .../apache/mahout/sparkbindings/package.scala | 4 +- 54 files changed, 460 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index bcf3507..5567f84 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -36,16 +36,16 @@ object H2OEngine extends DistributedEngine { // By default, use Hadoop 1 utils var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil - def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = + def colMeans[K](drm: CheckpointedDrm[K]): Vector = H2OHelper.colMeans(drm.h2odrm.frame) - def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = + def colSums[K](drm: CheckpointedDrm[K]): Vector = H2OHelper.colSums(drm.h2odrm.frame) - def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = + def norm[K](drm: CheckpointedDrm[K]): Double = H2OHelper.sumSqr(drm.h2odrm.frame) - def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = + def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = H2OHelper.nonZeroCnt(drm.h2odrm.frame) /** Broadcast support */ @@ -94,33 +94,33 @@ object H2OEngine extends DistributedEngine { case OpAtAnyKey(_) => throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") // Linear algebra operators - case op@OpAt(a) => At.exec(tr2phys(a)(op.classTagA)) - case op@OpABt(a, b) => ABt.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAtB(a, b) => AtB.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAtA(a) => AtA.exec(tr2phys(a)(op.classTagA)) - case op@OpAx(a, v) => Ax.exec(tr2phys(a)(op.classTagA), v) - case op@OpAtx(a, v) => Atx.exec(tr2phys(a)(op.classTagA), v) - case op@OpAewUnaryFunc(a, f, z) => AewUnary.exec(tr2phys(a)(op.classTagA), op.f, z) - case op@OpAewUnaryFuncFusion(a, f) => AewUnary.exec(tr2phys(a)(op.classTagA), op.f, op.evalZeros) - case op@OpAewB(a, b, opId) => AewB.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB), opId) - case op@OpAewScalar(a, s, opId) => AewScalar.exec(tr2phys(a)(op.classTagA), s, opId) - case op@OpTimesRightMatrix(a, m) => TimesRightMatrix.exec(tr2phys(a)(op.classTagA), m) + case op@OpAt(a) => At.exec(tr2phys(a)(a.keyClassTag)) + case op@OpABt(a, b) => ABt.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag)) + case op@OpAtB(a, b) => AtB.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag)) + case op@OpAtA(a) => AtA.exec(tr2phys(a)(a.keyClassTag)) + case op@OpAx(a, v) => Ax.exec(tr2phys(a)(a.keyClassTag), v) + case op@OpAtx(a, v) => Atx.exec(tr2phys(a)(a.keyClassTag), v) + case op@OpAewUnaryFunc(a, f, z) => AewUnary.exec(tr2phys(a)(a.keyClassTag), op.f, z) + case op@OpAewUnaryFuncFusion(a, f) => AewUnary.exec(tr2phys(a)(a.keyClassTag), op.f, op.evalZeros) + case op@OpAewB(a, b, opId) => AewB.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag), opId) + case op@OpAewScalar(a, s, opId) => AewScalar.exec(tr2phys(a)(a.keyClassTag), s, opId) + case op@OpTimesRightMatrix(a, m) => TimesRightMatrix.exec(tr2phys(a)(a.keyClassTag), m) // Non arithmetic - case op@OpCbind(a, b) => Cbind.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpCbindScalar(a, d, left) => CbindScalar.exec(tr2phys(a)(op.classTagA), d, left) - case op@OpRbind(a, b) => Rbind.exec(tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r) + case op@OpCbind(a, b) => Cbind.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag)) + case op@OpCbindScalar(a, d, left) => CbindScalar.exec(tr2phys(a)(a.keyClassTag), d, left) + case op@OpRbind(a, b) => Rbind.exec(tr2phys(a)(a.keyClassTag), tr2phys(b)(b.keyClassTag)) + case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(a.keyClassTag), r) // Custom operators - case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf, - (blockOp.classTagK == implicitly[ClassTag[String]]), blockOp.classTagA, blockOp.classTagK) - case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e) + case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.A.keyClassTag), blockOp.ncol, blockOp.bmf, + (blockOp.keyClassTag == classTag[String]), blockOp.A.keyClassTag, blockOp.keyClassTag) + case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(a.keyClassTag), m, e) case cp: CheckpointedDrm[K] => cp.h2odrm case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." .format(oper)) } } - implicit def cp2cph2o[K:ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]] + implicit def cp2cph2o[K](drm: CheckpointedDrm[K]): CheckpointedDrmH2O[K] = drm.asInstanceOf[CheckpointedDrmH2O[K]] /** stub class not implemented in H2O */ abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictionary, val columnIDs: BiDictionary) @@ -167,23 +167,23 @@ object H2OEngine extends DistributedEngine { * TODO: implement this please. * */ - override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc) + override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc) : Matrix = H2OHelper.allreduceBlock(drm.h2odrm, bmf, rf) /** * TODO: implement this please. */ - override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = ??? + override def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = ??? /** * (Optional) Sampling operation. Consistent with Spark semantics of the same. * TODO: implement this please. */ - override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = ??? + override def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = ??? /** * TODO: implement this please. */ - override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean) + override def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean) : (DrmLike[Int], Option[DrmLike[K]]) = ??? } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala b/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala index a15ca09..5a17144 100644 --- a/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala +++ b/math-scala/src/main/scala/org/apache/mahout/classifier/naivebayes/NaiveBayes.scala @@ -110,7 +110,7 @@ trait NaiveBayes extends java.io.Serializable{ * aggregatedByLabelObservationDrm is a DrmLike[Int] of aggregated * TF or TF-IDF counts per label */ - def extractLabelsAndAggregateObservations[K: ClassTag](stringKeyedObservations: DrmLike[K], + def extractLabelsAndAggregateObservations[K](stringKeyedObservations: DrmLike[K], cParser: CategoryParser = seq2SparseCategoryParser) (implicit ctx: DistributedContext): (mutable.HashMap[String, Integer], DrmLike[Int])= { @@ -120,13 +120,16 @@ trait NaiveBayes extends java.io.Serializable{ val numDocs=stringKeyedObservations.nrow val numFeatures=stringKeyedObservations.ncol + // For mapblocks that return K. + implicit val ktag = stringKeyedObservations.keyClassTag + // Extract categories from labels assigned by seq2sparse // Categories are Stored in Drm Keys as eg.: /Category/document_id // Get a new DRM with a single column so that we don't have to collect the // DRM into memory upfront. - val strippedObeservations= stringKeyedObservations.mapBlock(ncol=1){ - case(keys, block) => + val strippedObeservations = stringKeyedObservations.mapBlock(ncol = 1) { + case (keys, block) => val blockB = block.like(keys.size, 1) keys -> blockB } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala index 4e2f45c..92d0e12 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala @@ -42,7 +42,7 @@ private[math] object ALS { * @param drmV V matrix * @param iterationsRMSE RMSE values afeter each of iteration performed */ - class Result[K: ClassTag](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) { + class Result[K](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) { def toTuple = (drmU, drmV, iterationsRMSE) } @@ -74,7 +74,7 @@ private[math] object ALS { * @tparam K row key type of the input (100 is probably more than enough) * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result} */ - def dals[K: ClassTag]( + def dals[K]( drmA: DrmLike[K], k: Int = 50, lambda: Double = 0.0, @@ -85,6 +85,9 @@ private[math] object ALS { assert(convergenceThreshold < 1.0, "convergenceThreshold") assert(maxIterations >= 1, "maxIterations") + // Some mapblock() usage may require to know ClassTag[K] bound + implicit val ktag = drmA.keyClassTag + val drmAt = drmA.t // Initialize U and V so that they are identically distributed to A or A' http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala index 866ee34..9173d09 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala @@ -39,7 +39,10 @@ object DQR { * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so * their RDD should be able to zip successfully. */ - def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { + def dqrThin[K](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = { + + // Some mapBlock() calls need it + implicit val ktag = drmA.keyClassTag if (drmA.ncol > 5000) warn("A is too fat. A'A must fit in memory and easily broadcasted.") http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala index c98ee2e..4a769b9 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala @@ -38,9 +38,12 @@ object DSPCA { * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + def dspca[K](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): (DrmLike[K], DrmLike[Int], Vector) = { + // Some mapBlock() calls need it + implicit val ktag = drmA.keyClassTag + val drmAcp = drmA.checkpoint() implicit val ctx = drmAcp.context http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala index cecaec8..acd1dc1 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala @@ -23,9 +23,12 @@ object DSSVD { * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them * e.g. save them to hdfs in order to trigger their computation. */ - def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): + def dssvd[K](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0): (DrmLike[K], DrmLike[Int], Vector) = { + // Some mapBlock() calls need it + implicit val ktag = drmA.keyClassTag + val drmAcp = drmA.checkpoint() val m = drmAcp.nrow @@ -43,9 +46,9 @@ object DSSVD { // instantiate the Omega random matrix view in the backend instead. That way serialized closure // is much more compact. var drmY = drmAcp.mapBlock(ncol = r) { - case (keys, blockA) => + case (keys, blockA) â val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed) - keys -> blockY + keys â blockY }.checkpoint() var drmQ = dqrThin(drmY)._1 @@ -62,7 +65,7 @@ object DSSVD { trace(s"dssvd:drmB'=${drmBt.collect}.") - for (i <- 0 until q) { + for (i â 0 until q) { drmY = drmAcp %*% drmBt drmQ = dqrThin(drmY.checkpoint())._1 // Checkpoint Q if last iteration http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 7f97481..78b7ce8 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -34,13 +34,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] { /** If this checkpoint is already declared cached, uncache. */ def uncache(): this.type - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - def keyClassTag: ClassTag[K] - - /** changes the number of rows without touching the underlying data */ def newRowCardinality(n: Int): CheckpointedDrm[K] http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala index c43c6c7..da8ce9f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala @@ -27,7 +27,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ * the DRMBase once they stabilize. * */ -class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) { +class CheckpointedOps[K](val drm: CheckpointedDrm[K]) { /** Column sums. At this point this runs on checkpoint and collects in-core vector. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index 9ea90a1..ed93d89 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -47,18 +47,18 @@ trait DistributedEngine { def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] /** Engine-specific colSums implementation based on a checkpoint. */ - def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector + def colSums[K](drm: CheckpointedDrm[K]): Vector /** Optional engine-specific all reduce tensor operation. */ - def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix + def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ - def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector + def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector /** Engine-specific colMeans implementation based on a checkpoint. */ - def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector + def colMeans[K](drm: CheckpointedDrm[K]): Vector - def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double + def norm[K](drm: CheckpointedDrm[K]): Double /** Broadcast support */ def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] @@ -94,7 +94,7 @@ trait DistributedEngine { * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. */ - def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) + def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) /** * (Optional) Sampling operation. Consistent with Spark semantics of the same. @@ -104,9 +104,9 @@ trait DistributedEngine { * @tparam K * @return */ - def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] + def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] - def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix + def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix /** * Load IndexedDataset from text delimited format. @@ -137,7 +137,7 @@ object DistributedEngine { private val log = Logger.getLogger(DistributedEngine.getClass) /** This is mostly multiplication operations rewrites */ - private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { + private def pass1[K](action: DrmLike[K]): DrmLike[K] = { action match { @@ -154,16 +154,22 @@ object DistributedEngine { null } } - case OpAB(OpAt(a), b) if (a == b) â OpAtA(pass1(a)) - case OpABAnyKey(OpAtAnyKey(a), b) if (a == b) â OpAtA(pass1(a)) + case OpAB(OpAt(a), b) if a == b â OpAtA(pass1(a)) + case OpABAnyKey(OpAtAnyKey(a), b) if a == b â OpAtA(pass1(a)) + + // A small rule change: Now that we have removed ClassTag at the %*% operation, it doesn't + // match b[Int] case automatically any longer. So, we need to check and rewrite it dynamically + // and re-run pass1 again on the obtained tree. + case OpABAnyKey(a, b) if b.keyClassTag == ClassTag.Int â pass1(OpAB(a, b.asInstanceOf[DrmLike[Int]])) + case OpAtAnyKey(a) if a.keyClassTag == ClassTag.Int â pass1(OpAt(a.asInstanceOf[DrmLike[Int]])) // For now, rewrite left-multiply via transpositions, i.e. // inCoreA %*% B = (B' %*% inCoreA')' case op@OpTimesLeftMatrix(a, b) â - OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) + OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t)) // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments - case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) â + case op@OpRbind(a, b) if (op.keyClassTag == ClassTag.Int) â // Make sure closure sees only local vals, not attributes. We need to do these ugly casts // around because compiler could not infer that K is the same as Int, based on if() above. @@ -179,18 +185,18 @@ object DistributedEngine { // For everything else we just pass-thru the operator arguments to optimizer case uop: AbstractUnaryOp[_, K] â - uop.A = pass1(uop.A)(uop.classTagA) + uop.A = pass1(uop.A) uop case bop: AbstractBinaryOp[_, _, K] â - bop.A = pass1(bop.A)(bop.classTagA) - bop.B = pass1(bop.B)(bop.classTagB) + bop.A = pass1(bop.A) + bop.B = pass1(bop.B) bop } } /** This would remove stuff like A.t.t that previous step may have created */ - private def pass2[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { + private def pass2[K](action: DrmLike[K]): DrmLike[K] = { action match { // Fusion of unary funcs into single, like 1 + x * x. @@ -206,24 +212,24 @@ object DistributedEngine { pass2(OpAewUnaryFuncFusion(a, op.ff :+ op2)) // A.t.t => A - case OpAt(top@OpAt(a)) â pass2(a)(top.classTagA) + case OpAt(top@OpAt(a)) â pass2(a) // Stop at checkpoints case cd: CheckpointedDrm[_] â action // For everything else we just pass-thru the operator arguments to optimizer case uop: AbstractUnaryOp[_, K] â - uop.A = pass2(uop.A)(uop.classTagA) + uop.A = pass2(uop.A) uop case bop: AbstractBinaryOp[_, _, K] â - bop.A = pass2(bop.A)(bop.classTagA) - bop.B = pass2(bop.B)(bop.classTagB) + bop.A = pass2(bop.A) + bop.B = pass2(bop.B) bop } } /** Some further rewrites that are conditioned on A.t.t removal */ - private def pass3[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { + private def pass3[K](action: DrmLike[K]): DrmLike[K] = { action match { // matrix products. @@ -240,18 +246,18 @@ object DistributedEngine { case OpAB(a, b) â OpABt(pass3(a), OpAt(pass3(b))) // Rewrite A'x - case op@OpAx(op1@OpAt(a), x) â OpAtx(pass3(a)(op1.classTagA), x) + case op@OpAx(op1@OpAt(a), x) â OpAtx(pass3(a), x) // Stop at checkpoints case cd: CheckpointedDrm[_] â action // For everything else we just pass-thru the operator arguments to optimizer case uop: AbstractUnaryOp[_, K] â - uop.A = pass3(uop.A)(uop.classTagA) + uop.A = pass3(uop.A) uop case bop: AbstractBinaryOp[_, _, K] â - bop.A = pass3(bop.A)(bop.classTagA) - bop.B = pass3(bop.B)(bop.classTagB) + bop.A = pass3(bop.A) + bop.B = pass3(bop.B) bop } } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala index 8c615bf..23f5fc6 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala @@ -17,6 +17,8 @@ package org.apache.mahout.math.drm +import scala.reflect.ClassTag + /** * * Basic DRM trait. @@ -44,6 +46,12 @@ trait DrmLike[K] { def ncol: Int /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + def keyClassTag: ClassTag[K] + + /** * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer * and writing down Spark graph lineage since last checkpointed DRM. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala index 19432d0..e2c6e17 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala @@ -22,7 +22,7 @@ import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange} /** Common Drm ops */ -class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { +class DrmLikeOps[K](protected[drm] val drm: DrmLike[K]) { /** * Parallelism adjustments. <P/> @@ -90,9 +90,11 @@ class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { import RLikeDrmOps._ import RLikeOps._ + implicit val ktag = drm.keyClassTag + val rowSrc: DrmLike[K] = if (rowRange != ::) { - if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) { + if (ClassTag.Int == ktag) { assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range") val intKeyed = drm.asInstanceOf[DrmLike[Int]] http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala index aac7da1..54afc0e 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala @@ -25,7 +25,7 @@ import org.apache.mahout.math.drm.logical._ import org.apache.mahout.math.scalabindings._ import RLikeOps._ -class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { +class RLikeDrmOps[K](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { import RLikeDrmOps._ import org.apache.mahout.math.scalabindings._ @@ -64,11 +64,9 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { def /:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that / _, evalZeros = true) - def :%*%(that: DrmLike[Int]): DrmLike[K] = OpAB[K](A = this.drm, B = that) + def :%*%[B](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B,K](A = this.drm, B=that) - def %*%[B: ClassTag](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B, K](A = this.drm, B = that) - - def %*%(that: DrmLike[Int]): DrmLike[K] = this :%*% that + def %*%[B](that: DrmLike[B]): DrmLike[K] = this :%*% that def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that) @@ -98,6 +96,9 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { * @return map of row keys into row sums, front-end collected. */ def rowSumsMap(): Map[String, Double] = { + + implicit val ktag = drm.keyClassTag + val m = drm.mapBlock(ncol = 1) { case (keys, block) => keys -> dense(block.rowSums).t }.collect @@ -161,11 +162,11 @@ object RLikeDrmOps { implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm) - implicit def drm2RLikeOps[K: ClassTag](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm) + implicit def drm2RLikeOps[K](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm) - implicit def rlikeOps2Drm[K: ClassTag](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm + implicit def rlikeOps2Drm[K](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm - implicit def ops2Drm[K: ClassTag](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm + implicit def ops2Drm[K](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm // Removed in move to 1.2.1 PR #74 https://github.com/apache/mahout/pull/74/files // Not sure why. @@ -175,5 +176,5 @@ object RLikeDrmOps { * This is probably dangerous since it triggers implicit checkpointing with default storage level * setting. */ - implicit def drm2cpops[K: ClassTag](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint()) + implicit def drm2cpops[K](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm.checkpoint()) } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala index 3b6b8bf..9fba286 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala @@ -21,34 +21,25 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.{DistributedContext, DrmLike} /** - * Any logical binary operator (such as A + B). - * <P/> - * - * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence, - * it also inherits CheckpointAction. - * <P/> - * - * @param evidence$1 LHS key type tag - * @param evidence$2 RHS key type tag - * @param evidence$3 expression key type tag - * @tparam A LHS key type - * @tparam B RHS key type - * @tparam K result key type - */ -abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag] - extends CheckpointAction[K] with DrmLike[K] { + * Any logical binary operator (such as A + B). + * <P/> + * + * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence, + * it also inherits CheckpointAction. + * <P/> + * + * @tparam A LHS key type + * @tparam B RHS key type + * @tparam K result key type + */ +abstract class AbstractBinaryOp[A, B, K] + extends CheckpointAction[K] with DrmLike[K] { protected[drm] var A: DrmLike[A] + protected[drm] var B: DrmLike[B] + lazy val context: DistributedContext = A.context protected[mahout] def canHaveMissingRows: Boolean = false - - // These are explicit evidence export. Sometimes scala falls over to figure that on its own. - def classTagA: ClassTag[A] = implicitly[ClassTag[A]] - - def classTagB: ClassTag[B] = implicitly[ClassTag[B]] - - def classTagK: ClassTag[K] = implicitly[ClassTag[K]] - } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala index 60b2c77..28cf87d 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala @@ -21,17 +21,13 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.{DistributedContext, DrmLike} /** Abstract unary operator */ -abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag] +abstract class AbstractUnaryOp[A, K] extends CheckpointAction[K] with DrmLike[K] { protected[mahout] var A: DrmLike[A] lazy val context: DistributedContext = A.context - def classTagA: ClassTag[A] = implicitly[ClassTag[A]] - - def classTagK: ClassTag[K] = implicitly[ClassTag[K]] - override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala index a7934a3..6daaf0e 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.mahout.math.drm._ /** Implementation of distributed expression checkpoint and optimizer. */ -abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] { +abstract class CheckpointAction[K] extends DrmLike[K] { protected[mahout] lazy val partitioningTag: Long = Random.nextLong() @@ -37,6 +37,7 @@ abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] { */ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = cp match { case None => + implicit val cpTag = this.keyClassTag val plan = context.optimizerRewrite(this) val physPlan = context.toPhysical(plan, cacheHint) cp = Some(physPlan) http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala index 804a00e..e5316a0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAB.scala @@ -21,13 +21,19 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike /** Logical AB */ -case class OpAB[K: ClassTag ]( +case class OpAB[K]( override var A: DrmLike[K], override var B: DrmLike[Int]) extends AbstractBinaryOp[K, Int, K] { assert(A.ncol == B.nrow, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala index f131f3f..8437cdd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABAnyKey.scala @@ -21,13 +21,20 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike /** Logical AB */ -case class OpABAnyKey[B:ClassTag, K: ClassTag ]( +case class OpABAnyKey[B, K ]( override var A: DrmLike[K], override var B: DrmLike[B]) extends AbstractBinaryOp[K, B, K] { assert(A.ncol == B.nrow, "Incompatible operand geometry") + + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala index f6503ed..63bd7e1 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpABt.scala @@ -21,13 +21,19 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm._ /** Logical AB' */ -case class OpABt[K: ClassTag]( +case class OpABt[K]( override var A: DrmLike[K], override var B: DrmLike[Int]) extends AbstractBinaryOp[K,Int,K] { assert(A.ncol == B.ncol, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override lazy val keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala index da7c0d5..4bb83d0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewB.scala @@ -22,21 +22,27 @@ import org.apache.mahout.math.drm.DrmLike import scala.util.Random /** DRM elementwise operator */ -case class OpAewB[K: ClassTag]( +case class OpAewB[K]( override var A: DrmLike[K], override var B: DrmLike[K], val op: String ) extends AbstractBinaryOp[K, K, K] { - assert(A.ncol == B.ncol, "arguments must have same number of columns") assert(A.nrow == B.nrow, "arguments must have same number of rows") + assert(A.keyClassTag == B.keyClassTag, "Arguments of elementwise operators must have the same row key") override protected[mahout] lazy val partitioningTag: Long = if (A.partitioningTag == B.partitioningTag) A.partitioningTag else Random.nextLong() + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala index dbcb366..4f08686 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewScalar.scala @@ -26,7 +26,7 @@ import scala.util.Random * * @deprecated use [[OpAewUnaryFunc]] instead */ -case class OpAewScalar[K: ClassTag]( +case class OpAewScalar[K]( override var A: DrmLike[K], val scalar: Double, val op: String @@ -40,6 +40,12 @@ case class OpAewScalar[K: ClassTag]( /** Stuff like `A +1` is always supposed to fix this */ override protected[mahout] lazy val canHaveMissingRows: Boolean = false + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala index 71489ab..6f93980 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala @@ -24,7 +24,7 @@ import scala.util.Random /** * @author dmitriy */ -case class OpAewUnaryFunc[K: ClassTag]( +case class OpAewUnaryFunc[K]( override var A: DrmLike[K], val f: (Double) => Double, val evalZeros:Boolean = false @@ -38,6 +38,12 @@ case class OpAewUnaryFunc[K: ClassTag]( /** Stuff like `A +1` is always supposed to fix this */ override protected[mahout] lazy val canHaveMissingRows: Boolean = false + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override lazy val keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala index ed95f4f..5b0133f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala @@ -25,7 +25,7 @@ import collection._ /** * Composition of unary elementwise functions. */ -case class OpAewUnaryFuncFusion[K: ClassTag]( +case class OpAewUnaryFuncFusion[K]( override var A: DrmLike[K], var ff:List[OpAewUnaryFunc[K]] = Nil ) extends AbstractUnaryOp[K,K] with TEwFunc { @@ -38,6 +38,12 @@ case class OpAewUnaryFuncFusion[K: ClassTag]( /** Stuff like `A +1` is always supposed to fix this */ override protected[mahout] lazy val canHaveMissingRows: Boolean = false + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala index 4791301..59c71bd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAt.scala @@ -19,11 +19,19 @@ package org.apache.mahout.math.drm.logical import org.apache.mahout.math.drm._ +import scala.reflect.ClassTag + /** Logical A' */ case class OpAt( override var A: DrmLike[Int]) extends AbstractUnaryOp[Int, Int] { + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = A.ncol http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala index ad2a5d8..4c01f46 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtA.scala @@ -21,10 +21,16 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike /** A'A */ -case class OpAtA[K: ClassTag]( +case class OpAtA[K]( override var A: DrmLike[K] ) extends AbstractUnaryOp[K, Int] { + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = A.ncol http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala index 4e1dd5c..b23dca7 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtAnyKey.scala @@ -21,10 +21,16 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm._ /** Logical A' for any row key to support A'A optimizations */ -case class OpAtAnyKey[A: ClassTag]( +case class OpAtAnyKey[A]( override var A: DrmLike[A]) extends AbstractUnaryOp[A, Int] { + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = A.ncol http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala index ef3ae6b..7ec8585 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtB.scala @@ -21,13 +21,19 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike /** Logical A'B */ -case class OpAtB[A: ClassTag]( +case class OpAtB[A]( override var A: DrmLike[A], override var B: DrmLike[A]) extends AbstractBinaryOp[A, A, Int] { assert(A.nrow == B.nrow, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = A.ncol http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala index 36769c7..97b6de1 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAtx.scala @@ -22,6 +22,8 @@ import org.apache.mahout.math.scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm._ +import scala.reflect.ClassTag + /** Logical A'x. */ case class OpAtx( override var A: DrmLike[Int], @@ -32,6 +34,12 @@ case class OpAtx( assert(A.nrow == x.length, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override val keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = safeToNonNegInt(A.ncol) http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala index a726989..d25e0d9 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAx.scala @@ -24,7 +24,7 @@ import RLikeOps._ import org.apache.mahout.math.drm.DrmLike /** Logical Ax. */ -case class OpAx[K: ClassTag]( +case class OpAx[K]( override var A: DrmLike[K], val x: Vector ) extends AbstractUnaryOp[K, K] { @@ -33,6 +33,12 @@ case class OpAx[K: ClassTag]( assert(A.ncol == x.length, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag: ClassTag[K] = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala index 0598551..932f133 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala @@ -22,12 +22,19 @@ import org.apache.mahout.math.drm.DrmLike import scala.util.Random /** cbind() logical operator */ -case class OpCbind[K: ClassTag]( +case class OpCbind[K]( override var A: DrmLike[K], override var B: DrmLike[K] ) extends AbstractBinaryOp[K, K, K] { assert(A.nrow == B.nrow, "arguments must have same number of rows") + require(A.keyClassTag == B.keyClassTag, "arguments must have same row key") + + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = A.keyClassTag override protected[mahout] lazy val partitioningTag: Long = if (A.partitioningTag == B.partitioningTag) A.partitioningTag http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala index 5aee518..99c2bfa 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala @@ -19,7 +19,7 @@ package org.apache.mahout.math.drm.logical import reflect.ClassTag import org.apache.mahout.math.drm.DrmLike -case class OpCbindScalar[K:ClassTag]( +case class OpCbindScalar[K]( override var A:DrmLike[K], var x:Double, val leftBind:Boolean ) extends AbstractUnaryOp[K,K] { @@ -28,6 +28,12 @@ case class OpCbindScalar[K:ClassTag]( override protected[mahout] lazy val partitioningTag: Long = A.partitioningTag + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala index a1cd718..95e690b 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpMapBlock.scala @@ -17,13 +17,12 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag -import org.apache.mahout.math.scalabindings._ -import RLikeOps._ import org.apache.mahout.math.drm.{BlockMapFunc, DrmLike} + +import scala.reflect.{ClassTag, classTag} import scala.util.Random -case class OpMapBlock[S: ClassTag, R: ClassTag]( +case class OpMapBlock[S, R: ClassTag]( override var A: DrmLike[S], val bmf: BlockMapFunc[S, R], val _ncol: Int = -1, @@ -34,6 +33,12 @@ case class OpMapBlock[S: ClassTag, R: ClassTag]( override protected[mahout] lazy val partitioningTag: Long = if (identicallyPartitioned) A.partitioningTag else Random.nextLong() + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = classTag[R] + /** R-like syntax for number of rows. */ def nrow: Long = if (_nrow >= 0) _nrow else A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala index f438728..0fadce3 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala @@ -4,12 +4,18 @@ import org.apache.mahout.math.drm.DrmLike import scala.reflect.ClassTag /** Parallelism operator */ -case class OpPar[K: ClassTag]( +case class OpPar[K]( override var A: DrmLike[K], val minSplits: Int = -1, val exactSplits: Int = -1) extends AbstractUnaryOp[K, K] { + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala index d45714b..f8c1059 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala @@ -22,15 +22,22 @@ import org.apache.mahout.math.drm.DrmLike import scala.util.Random /** rbind() logical operator */ -case class OpRbind[K: ClassTag]( +case class OpRbind[K]( override var A: DrmLike[K], override var B: DrmLike[K] ) extends AbstractBinaryOp[K, K, K] { assert(A.ncol == B.ncol, "arguments must have same number of columns") + require(A.keyClassTag == B.keyClassTag, "arguments of rbind() must have the same row key type") override protected[mahout] lazy val partitioningTag: Long = Random.nextLong() + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override def keyClassTag = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow + B.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala index 697bbd3..c7d3bfa 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRowRange.scala @@ -19,6 +19,8 @@ package org.apache.mahout.math.drm.logical import org.apache.mahout.math.drm.DrmLike +import scala.reflect.ClassTag + /** Logical row-range slicing */ case class OpRowRange( override var A: DrmLike[Int], @@ -27,6 +29,12 @@ case class OpRowRange( assert(rowRange.head >= 0 && rowRange.last < A.nrow, "row range out of range") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override val keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = rowRange.length http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala index 1ca79b3..e8ac475 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala @@ -22,6 +22,8 @@ import org.apache.mahout.math.scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm.DrmLike +import scala.reflect.ClassTag + /** Logical Times-left over in-core matrix operand */ case class OpTimesLeftMatrix( val left: Matrix, @@ -30,6 +32,12 @@ case class OpTimesLeftMatrix( assert(left.ncol == A.nrow, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override val keyClassTag = ClassTag.Int + /** R-like syntax for number of rows. */ def nrow: Long = left.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala index c55f7f0..1b12035 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala @@ -24,7 +24,7 @@ import RLikeOps._ import org.apache.mahout.math.drm.DrmLike /** Logical times-right over in-core matrix operand. */ -case class OpTimesRightMatrix[K: ClassTag]( +case class OpTimesRightMatrix[K]( override var A: DrmLike[K], val right: Matrix ) extends AbstractUnaryOp[K, K] { @@ -33,6 +33,12 @@ case class OpTimesRightMatrix[K: ClassTag]( assert(A.ncol == right.nrow, "Incompatible operand geometry") + /** + * Explicit extraction of key class Tag since traits don't support context bound access; but actual + * implementation knows it + */ + override lazy val keyClassTag = A.keyClassTag + /** R-like syntax for number of rows. */ def nrow: Long = A.nrow http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index 6d62ff1..0b7bb8c 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -85,20 +85,20 @@ package object drm { /** Just throw all engine operations into context as well. */ implicit def ctx2engine(ctx: DistributedContext): DistributedEngine = ctx.engine - implicit def drm2drmCpOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedOps[K] = + implicit def drm2drmCpOps[K](drm: CheckpointedDrm[K]): CheckpointedOps[K] = new CheckpointedOps[K](drm) /** * We assume that whenever computational action is invoked without explicit checkpoint, the user * doesn't imply caching */ - implicit def drm2Checkpointed[K: ClassTag](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE) + implicit def drm2Checkpointed[K](drm: DrmLike[K]): CheckpointedDrm[K] = drm.checkpoint(CacheHint.NONE) /** Implicit conversion to in-core with NONE caching of the result. */ - implicit def drm2InCore[K: ClassTag](drm: DrmLike[K]): Matrix = drm.collect + implicit def drm2InCore[K](drm: DrmLike[K]): Matrix = drm.collect /** Do vertical concatenation of collection of blockified tuples */ - private[mahout] def rbind[K: ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = { + private[mahout] def rbind[K:ClassTag](blocks: Iterable[BlockifiedDrmTuple[K]]): BlockifiedDrmTuple[K] = { assert(blocks.nonEmpty, "rbind: 0 blocks passed in") if (blocks.size == 1) { // No coalescing required. @@ -132,7 +132,7 @@ package object drm { * key type is actually Int, then we just return the argument with None for the map, * regardless of computeMap parameter. */ - def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = + def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = drmX.context.engine.drm2IntKeyed(drmX, computeMap) /** @@ -143,23 +143,23 @@ package object drm { * @tparam K * @return samples */ - def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = + def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K] = drmX.context.engine.drmSampleRows(drmX, fraction, replacement) - def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean = false): Matrix = + def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean = false): Matrix = drmX.context.engine.drmSampleKRows(drmX, numSamples, replacement) /////////////////////////////////////////////////////////// // Elementwise unary functions on distributed operands. - def dexp[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.exp, true) + def dexp[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.exp, true) - def dlog[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.log, true) + def dlog[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.log, true) - def dabs[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.abs) + def dabs[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.abs) - def dsqrt[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.sqrt) + def dsqrt[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.sqrt) - def dsignum[K: ClassTag](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.signum) + def dsignum[K](drmA: DrmLike[K]): DrmLike[K] = new OpAewUnaryFunc[K](drmA, math.signum) /////////////////////////////////////////////////////////// // Misc. math utilities. @@ -171,7 +171,7 @@ package object drm { * @tparam K * @return colMeans â colVariances */ - def dcolMeanVars[K: ClassTag](drmA: DrmLike[K]): (Vector, Vector) = { + def dcolMeanVars[K](drmA: DrmLike[K]): (Vector, Vector) = { import RLikeDrmOps._ @@ -190,7 +190,7 @@ package object drm { * @param drmA note: input will be pinned to cache if not yet pinned * @return colMeans â colStdevs */ - def dcolMeanStdevs[K: ClassTag](drmA: DrmLike[K]): (Vector, Vector) = { + def dcolMeanStdevs[K](drmA: DrmLike[K]): (Vector, Vector) = { val (mu, vars) = dcolMeanVars(drmA) mu â (vars ::= math.sqrt _) } http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala index fdfb3f9..525da11 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeOpsSuiteBase.scala @@ -24,16 +24,18 @@ import scalabindings._ import RLikeOps._ import RLikeDrmOps._ +import scala.reflect.{ClassTag,classTag} + /** Common tests for DrmLike operators to be executed by all distributed engines. */ trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers { - this: FunSuite => + this: FunSuite â test("mapBlock") { val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) val A = drmParallelize(m = inCoreA, numPartitions = 2) val B = A.mapBlock(/* Inherit width */) { - case (keys, block) => keys -> (block += 1.0) + case (keys, block) â keys â (block += 1.0) } val inCoreB = B.collect @@ -43,9 +45,23 @@ trait DrmLikeOpsSuiteBase extends DistributedMahoutSuite with Matchers { // Assert they are the same (inCoreB - inCoreBControl).norm should be < 1E-10 + B.keyClassTag shouldBe ClassTag.Int } + test ("mapBlock implicit keying") { + + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + val B = A.mapBlock(/* Inherit width */) { + case (keys, block) â keys.map { k â k.toString } â block + } + + B.keyClassTag shouldBe classTag[String] + + } + + test("allReduceBlock") { val mxA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala b/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala index ca06cc0..f76a3f9 100644 --- a/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala +++ b/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala @@ -58,7 +58,7 @@ object SparkNaiveBayes extends NaiveBayes{ * aggregatedByLabelObservationDrm is a DrmLike[Int] of aggregated * TF or TF-IDF counts per label */ - override def extractLabelsAndAggregateObservations[K: ClassTag](stringKeyedObservations: DrmLike[K], + override def extractLabelsAndAggregateObservations[K](stringKeyedObservations: DrmLike[K], cParser: CategoryParser = seq2SparseCategoryParser) (implicit ctx: DistributedContext): (mutable.HashMap[String, Integer], DrmLike[Int]) = { http://git-wip-us.apache.org/repos/asf/mahout/blob/6919fd9f/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index 0eed8d4..e9f2f95 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -79,7 +79,7 @@ object TrainNBDriver extends MahoutSparkDriver { } /** Read the training set from inputPath/part-x-00000 sequence file of form <Text,VectorWritable> */ - private def readTrainingSet: DrmLike[_]= { + private def readTrainingSet(): DrmLike[_]= { val inputPath = parser.opts("input").asInstanceOf[String] val trainingSet= drm.drmDfsRead(inputPath) trainingSet @@ -99,7 +99,7 @@ object TrainNBDriver extends MahoutSparkDriver { Hadoop1HDFSUtil.delete(fullPathToModel) } - val trainingSet = readTrainingSet + val trainingSet = readTrainingSet() // Use Spark-Optimized Naive Bayes here to extract labels and aggregate options val (labelIndex, aggregatedObservations) = SparkNaiveBayes.extractLabelsAndAggregateObservations(trainingSet) val model = SparkNaiveBayes.train(aggregatedObservations, labelIndex, complementary, alpha.toFloat)
