Repository: mahout Updated Branches: refs/heads/master fcd6b9e01 -> e8b9c8003
http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 42ddceb..99412df 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -24,7 +24,7 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput} +import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSparkOps, cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput} import org.apache.mahout.math._ import scala.Predef import scala.reflect.ClassTag @@ -46,7 +46,7 @@ object SparkEngine extends DistributedEngine { // By default, use Hadoop 1 utils var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil - def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + def colSums[K](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol drm.rdd @@ -56,7 +56,7 @@ object SparkEngine extends DistributedEngine { // Fold() doesn't work with kryo still. So work around it. .mapPartitions(iter â { - val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) â acc += v) + val acc = ((new DenseVector(n): Vector) /: iter) ((acc, v) â acc += v) Iterator(acc) }) @@ -65,7 +65,7 @@ object SparkEngine extends DistributedEngine { .reduce(_ += _) } - def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol drm.rdd @@ -76,7 +76,7 @@ object SparkEngine extends DistributedEngine { // Fold() doesn't work with kryo still. So work around it. .mapPartitions(iter â { val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) â - v.nonZeroes().foreach { elem â acc(elem.index) += 1} + v.nonZeroes().foreach { elem â acc(elem.index) += 1 } acc } Iterator(acc) @@ -87,10 +87,10 @@ object SparkEngine extends DistributedEngine { } /** Engine-specific colMeans implementation based on a checkpoint. */ - override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = + override def colMeans[K](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow - override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = + override def norm[K](drm: CheckpointedDrm[K]): Double = drm.rdd // Compute sum of squares of each vector .map { @@ -100,7 +100,7 @@ object SparkEngine extends DistributedEngine { /** Optional engine-specific all reduce tensor operation. */ - override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: + override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = { import drm._ @@ -108,11 +108,11 @@ object SparkEngine extends DistributedEngine { } /** - * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P> - * - * A particular physical engine implementation may choose to either use or not use these rewrites - * as a useful basic rewriting rule.<P> - */ + * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P> + * + * A particular physical engine implementation may choose to either use or not use these rewrites + * as a useful basic rewriting rule.<P> + */ override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action) @@ -139,14 +139,13 @@ object SparkEngine extends DistributedEngine { def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m) /** - * Load DRM from hdfs (as in Mahout DRM format) - * - * @param path - * @param sc spark context (wanted to make that implicit, doesn't work in current version of - * scala with the type bounds, sorry) - * - * @return DRM[Any] where Any is automatically translated to value type - */ + * Load DRM from hdfs (as in Mahout DRM format) + * + * @param path + * @param sc spark context (wanted to make that implicit, doesn't work in current version of + * scala with the type bounds, sorry) + * @return DRM[Any] where Any is automatically translated to value type + */ def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { // Require that context is actually Spark context. @@ -163,7 +162,7 @@ object SparkEngine extends DistributedEngine { val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) // Immediately convert keys and value writables into value types. - .map { case (wKey, wVec) â k2vFunc(wKey) -> wVec.get()} + .map { case (wKey, wVec) â k2vFunc(wKey) -> wVec.get() } // Wrap into a DRM type with correct matrix row key class tag evident. drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]]) @@ -221,11 +220,12 @@ object SparkEngine extends DistributedEngine { } /** - * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys - * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. - */ - override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = { - if (classTag[K] == ClassTag.Int) { + * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys + * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. + */ + override def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = { + implicit val ktag = drmX.keyClassTag + if (ktag == ClassTag.Int) { drmX.asInstanceOf[DrmLike[Int]] â None } else { @@ -237,26 +237,29 @@ object SparkEngine extends DistributedEngine { val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = computeMap) // Convert computed key mapping to a matrix. - val mxKeyMap = keyMap.map { rdd => - drmWrap(rdd = rdd.map { case (key, ordinal) â key â (dvec(ordinal):Vector)}, ncol = 1, nrow = nrow) + val mxKeyMap = keyMap.map { rdd â + drmWrap(rdd = rdd.map { case (key, ordinal) â key â (dvec(ordinal): Vector) }, ncol = 1, nrow = nrow) } drmWrap(rdd = intRdd, ncol = ncol) â mxKeyMap - } + } } /** - * (Optional) Sampling operation. Consistent with Spark semantics of the same. - * @param drmX - * @param fraction - * @param replacement - * @tparam K - * @return - */ - override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = { + * (Optional) Sampling operation. Consistent with Spark semantics of the same. + * + * @param drmX + * @param fraction + * @param replacement + * @tparam K + * @return + */ + override def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = { + + implicit val ktag = drmX.keyClassTag // We do want to take ncol if already computed, if not, then we don't want to trigger computation // here. @@ -265,14 +268,14 @@ object SparkEngine extends DistributedEngine { case _ â -1 } val sample = drmX.rdd.sample(withReplacement = replacement, fraction = fraction) - if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol) + if (ktag != ClassTag.Int) return drmWrap(sample, ncol = ncol) // K == Int: Int-keyed sample. rebase int counts. drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol = ncol).asInstanceOf[DrmLike[K]] } - override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = { + override def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = { val ncol = drmX match { case cp: CheckpointedDrmSpark[K] â cp._ncol @@ -286,9 +289,9 @@ object SparkEngine extends DistributedEngine { val isSparse = sample.exists { case (_, vec) â !vec.isDense } val vectors = sample.map(_._2) - val labels = sample.view.zipWithIndex.map { case ((key, _), idx) â key.toString â (idx:Integer) }.toMap + val labels = sample.view.zipWithIndex.map { case ((key, _), idx) â key.toString â (idx: Integer) }.toMap - val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors) + val mx: Matrix = if (isSparse) sparse(vectors: _*) else dense(vectors) mx.setRowLabelBindings(labels) mx @@ -301,7 +304,7 @@ object SparkEngine extends DistributedEngine { case CacheHint.MEMORY_ONLY â StorageLevel.MEMORY_ONLY case CacheHint.MEMORY_ONLY_2 â StorageLevel.MEMORY_ONLY_2 case CacheHint.MEMORY_ONLY_SER â StorageLevel.MEMORY_ONLY_SER - case CacheHint.MEMORY_ONLY_SER_2 â StorageLevel.MEMORY_ONLY_SER_2 + case CacheHint.MEMORY_ONLY_SER_2 â StorageLevel.MEMORY_ONLY_SER_2 case CacheHint.MEMORY_AND_DISK â StorageLevel.MEMORY_AND_DISK case CacheHint.MEMORY_AND_DISK_2 â StorageLevel.MEMORY_AND_DISK_2 case CacheHint.MEMORY_AND_DISK_SER â StorageLevel.MEMORY_AND_DISK_SER @@ -309,7 +312,7 @@ object SparkEngine extends DistributedEngine { } /** Translate previously optimized physical plan */ - private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = { + private def tr2phys[K](oper: DrmLike[K]): DrmRddInput[K] = { // I do explicit evidence propagation here since matching via case classes seems to be loosing // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String]. // Hence you see explicit evidence attached to all recursive exec() calls. @@ -319,28 +322,32 @@ object SparkEngine extends DistributedEngine { // (we cannot do actual flip for non-int-keyed arguments) case OpAtAnyKey(_) â throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") - case op@OpAt(a) â At.at(op, tr2phys(a)(op.classTagA)) - case op@OpABt(a, b) â ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAtB(a, b) â AtB.atb(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAtA(a) â AtA.at_a(op, tr2phys(a)(op.classTagA)) - case op@OpAx(a, x) â Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA)) - case op@OpAtx(a, x) â Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA)) - case op@OpAewUnaryFunc(a, _, _) â AewB.a_ew_func(op, tr2phys(a)(op.classTagA)) - case op@OpAewUnaryFuncFusion(a, _) â AewB.a_ew_func(op, tr2phys(a)(op.classTagA)) - case op@OpAewB(a, b, opId) â AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpCbind(a, b) â CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpCbindScalar(a, _, _) â CbindAB.cbindAScalar(op, tr2phys(a)(op.classTagA)) - case op@OpRbind(a, b) â RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAewScalar(a, s, _) â AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s) - case op@OpRowRange(a, _) â Slicing.rowRange(op, tr2phys(a)(op.classTagA)) - case op@OpTimesRightMatrix(a, _) â AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA)) + case op@OpAt(a) if op.keyClassTag == ClassTag.Int â At.at(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]] + case op@OpABt(a, b) â ABt.abt(op, tr2phys(a), tr2phys(b)) + case op@OpAtB(a, b) â AtB.atb(op, tr2phys(a), tr2phys(b)).asInstanceOf[DrmRddInput[K]] + case op@OpAtA(a) if op.keyClassTag == ClassTag.Int â AtA.at_a(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]] + case op@OpAx(a, x) â Ax.ax_with_broadcast(op, tr2phys(a)) + case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int â + Ax.atx_with_broadcast(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]] + case op@OpAewUnaryFunc(a, _, _) â AewB.a_ew_func(op, tr2phys(a)) + case op@OpAewUnaryFuncFusion(a, _) â AewB.a_ew_func(op, tr2phys(a)) + case op@OpAewB(a, b, opId) â AewB.a_ew_b(op, tr2phys(a), tr2phys(b)) + case op@OpCbind(a, b) â CbindAB.cbindAB_nograph(op, tr2phys(a), tr2phys(b)) + case op@OpCbindScalar(a, _, _) â CbindAB.cbindAScalar(op, tr2phys(a)) + case op@OpRbind(a, b) â RbindAB.rbindAB(op, tr2phys(a), tr2phys(b)) + case op@OpAewScalar(a, s, _) â AewB.a_ew_scalar(op, tr2phys(a), s) + case op@OpRowRange(a, _) if op.keyClassTag == ClassTag.Int â + Slicing.rowRange(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]] + case op@OpTimesRightMatrix(a, _) â AinCoreB.rightMultiply(op, tr2phys(a)) // Custom operators, we just execute them - case blockOp: OpMapBlock[K, _] â MapBlock.exec( - src = tr2phys(blockOp.A)(blockOp.classTagA), + case blockOp: OpMapBlock[_, K] â MapBlock.exec( + src = tr2phys(blockOp.A), operator = blockOp ) - case op@OpPar(a, _, _) â Par.exec(op, tr2phys(a)(op.classTagA)) - case cp: CheckpointedDrm[K] â cp.rdd: DrmRddInput[K] + case op@OpPar(a, _, _) â Par.exec(op, tr2phys(a)) + case cp: CheckpointedDrm[K] â + implicit val ktag=cp.keyClassTag + cp.rdd: DrmRddInput[K] case _ â throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." .format(oper)) @@ -348,32 +355,34 @@ object SparkEngine extends DistributedEngine { } /** - * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text - * delimited files. Reads a vector per row. - * @param src a comma separated list of URIs to read from - * @param schema how the text file is formatted - */ + * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text + * delimited files. Reads a vector per row. + * + * @param src a comma separated list of URIs to read from + * @param schema how the text file is formatted + */ def indexedDatasetDFSRead(src: String, - schema: Schema = DefaultIndexedDatasetReadSchema, - existingRowIDs: Option[BiDictionary] = None) - (implicit sc: DistributedContext): - IndexedDatasetSpark = { + schema: Schema = DefaultIndexedDatasetReadSchema, + existingRowIDs: Option[BiDictionary] = None) + (implicit sc: DistributedContext): + IndexedDatasetSpark = { val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) val ids = reader.readRowsFrom(src, existingRowIDs) ids } /** - * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text - * delimited files. Reads an element per row. - * @param src a comma separated list of URIs to read from - * @param schema how the text file is formatted - */ + * Returns an [[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from default text + * delimited files. Reads an element per row. + * + * @param src a comma separated list of URIs to read from + * @param schema how the text file is formatted + */ def indexedDatasetDFSReadElements(src: String, - schema: Schema = DefaultIndexedDatasetElementReadSchema, - existingRowIDs: Option[BiDictionary] = None) - (implicit sc: DistributedContext): - IndexedDatasetSpark = { + schema: Schema = DefaultIndexedDatasetElementReadSchema, + existingRowIDs: Option[BiDictionary] = None) + (implicit sc: DistributedContext): + IndexedDatasetSpark = { val reader = new TextDelimitedIndexedDatasetReader(schema)(sc) val ids = reader.readElementsFrom(src, existingRowIDs) ids http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala index 11e2bad..5142d3b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala @@ -44,13 +44,13 @@ object ABt { * @param srcB B source RDD * @tparam K */ - def abt[K: ClassTag]( + def abt[K]( operator: OpABt[K], srcA: DrmRddInput[K], srcB: DrmRddInput[Int]): DrmRddInput[K] = { debug("operator AB'(Spark)") - abt_nograph(operator, srcA, srcB) + abt_nograph(operator, srcA, srcB)(operator.keyClassTag) } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 8a90398..d8637d2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -17,20 +17,18 @@ package org.apache.mahout.sparkbindings.blas -import org.apache.mahout.sparkbindings.drm.DrmRddInput -import scala.reflect.ClassTag -import org.apache.spark.SparkContext._ +import org.apache.mahout.logging._ import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector} -import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, OpAewScalar, OpAewB} -import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc} -import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm} import org.apache.mahout.math.drm._ -import org.apache.mahout.logging._ -import collection._ -import JavaConversions._ +import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewB, OpAewScalar, TEwFunc} +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFunc, ReduceFuncScalar} +import org.apache.mahout.sparkbindings.drm.DrmRddInput +import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm} + +import scala.reflect.{ClassTag, classTag} +import scala.collection.JavaConversions._ /** Elementwise drm-drm operators */ object AewB { @@ -53,7 +51,9 @@ object AewB { /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */ - def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { + def a_ew_b[K](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { + + implicit val ktag = op.keyClassTag val ewOps = getEWOps() val opId = op.op @@ -111,15 +111,16 @@ object AewB { rdd } - def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = { + def a_ew_func[K](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = { val evalZeros = op.evalZeros val inplace = ewInplace() val f = op.f + implicit val ktag = op.keyClassTag // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. - val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) { + val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) { val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { @@ -149,12 +150,13 @@ object AewB { } /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */ - def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): + def a_ew_scalar[K](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] = { val ewOps = getEWOps() val opId = op.op + implicit val ktag = op.keyClassTag val reduceFunc = opId match { case "+" => ewOps.plusScalar @@ -168,7 +170,7 @@ object AewB { // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. - val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) { + val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows) { val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala index 5f9f84a..6fe076e 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala @@ -17,7 +17,10 @@ object AinCoreB { private final implicit val log = getLog(AinCoreB.getClass) - def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { + def rightMultiply[K](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { + + implicit val ktag = op.keyClassTag + if ( op.right.isInstanceOf[DiagonalMatrix]) rightMultiply_diag(op, srcA) else http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala index 629accd..42e56e7 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala @@ -13,10 +13,11 @@ import org.apache.mahout.math.drm.logical.{OpAx, OpAtx} /** Matrix product with one of operands an in-core matrix */ object Ax { - def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { + def ax_with_broadcast[K](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) implicit val sc: DistributedContext = rddA.sparkContext + implicit val ktag = op.keyClassTag val bcastX = drmBroadcast(op.x) http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala index 4a379ec..e900749 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala @@ -32,7 +32,9 @@ object CbindAB { private val log = Logger.getLogger(CbindAB.getClass) - def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = { + def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = { + + implicit val ktag = op.keyClassTag val srcRdd = srcA.toDrmRdd() val ncol = op.A.ncol @@ -60,13 +62,14 @@ object CbindAB { resultRdd } - def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { + def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { val a = srcA.toDrmRdd() val b = srcB.toDrmRdd() val n = op.ncol val n1 = op.A.ncol val n2 = n - n1 + implicit val ktag = op.keyClassTag // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip // instead of join, and apply the op map-side. Otherwise, perform join and apply the op http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala index 4cd9a74..6104d83 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala @@ -23,7 +23,7 @@ import RLikeOps._ import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector} import org.apache.mahout.sparkbindings.DrmRdd -class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) { +class DrmRddOps[K](private[blas] val rdd: DrmRdd[K]) { /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */ def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({ http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala index 2933ddc..7e48ed8 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala @@ -25,13 +25,14 @@ import scala.reflect.ClassTag object MapBlock { - def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = { + def exec[S, R](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = { // We can't use attributes directly in the closure in order to avoid putting the whole object // into closure. val bmf = operator.bmf val ncol = operator.ncol - val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => { + implicit val rtag = operator.keyClassTag + src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => { val out = bmf(blockTuple) assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.") @@ -39,8 +40,6 @@ object MapBlock { out }) - - rdd } } http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala index 0434a72..7e32b69 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala @@ -15,8 +15,9 @@ object Par { private final implicit val log = getLog(Par.getClass) - def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = { + def exec[K](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = { + implicit val ktag = op.keyClassTag val srcBlockified = src.isBlockified val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd() http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala index 62abba6..14772f6 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala @@ -27,7 +27,9 @@ object RbindAB { private val log = Logger.getLogger(RbindAB.getClass) - def rbindAB[K: ClassTag](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { + def rbindAB[K](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { + + implicit val ktag = op.keyClassTag // If any of the inputs is blockified, use blockified inputs if (srcA.isBlockified || srcB.isBlockified) { http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 6b8513f..5a83f80 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -35,7 +35,7 @@ import JavaConversions._ */ package object blas { - implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd) + implicit def drmRdd2ops[K](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd) /** @@ -46,7 +46,7 @@ package object blas { * @tparam K existing key parameter * @return */ - private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int], + private[mahout] def rekeySeqInts[K](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int], Option[RDD[(K, Int)]]) = { // Spark context please. http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala index abcfc64..3c086fe 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala @@ -1,16 +1,17 @@ package org.apache.mahout.sparkbindings.drm import org.apache.mahout.math.drm.CheckpointedDrm +import org.apache.mahout.sparkbindings.DrmRdd import scala.reflect.ClassTag /** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */ -class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) { +class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) { assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix") private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]] /** Spark matrix customization exposure */ - def rdd = sparkDrm.rddInput.toDrmRdd() + def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd() } http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala index e18d6da..b793098 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala @@ -37,15 +37,15 @@ package object drm { private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings"); - private[sparkbindings] implicit def cpDrm2DrmRddInput[K: ClassTag](cp: CheckpointedDrmSpark[K]): DrmRddInput[K] = + private[sparkbindings] implicit def cpDrm2DrmRddInput[K](cp: CheckpointedDrmSpark[K]): DrmRddInput[K] = cp.rddInput - private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K: ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] = + private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K](cp: CheckpointedDrm[K]): DrmRddInput[K] = cp.asInstanceOf[CheckpointedDrmSpark[K]] - private[sparkbindings] implicit def drmRdd2drmRddInput[K: ClassTag](rdd: DrmRdd[K]) = new DrmRddInput[K](Left(rdd)) + private[sparkbindings] implicit def drmRdd2drmRddInput[K:ClassTag](rdd: DrmRdd[K]) = new DrmRddInput[K](Left(rdd)) - private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K: ClassTag](rdd: BlockifiedDrmRdd[K]) = new + private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K:ClassTag](rdd: BlockifiedDrmRdd[K]) = new DrmRddInput[K]( Right(rdd)) http://git-wip-us.apache.org/repos/asf/mahout/blob/e8b9c800/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 330ae38..de309c3 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -108,10 +108,10 @@ package object sparkbindings { implicit def sb2bc[T](b: Broadcast[T]): BCast[T] = new SparkBCast(b) /** Adding Spark-specific ops */ - implicit def cpDrm2cpDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K] = + implicit def cpDrm2cpDrmSparkOps[K](drm: CheckpointedDrm[K]): CheckpointedDrmSparkOps[K] = new CheckpointedDrmSparkOps[K](drm) - implicit def drm2cpDrmSparkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K] + implicit def drm2cpDrmSparkOps[K](drm: DrmLike[K]): CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K] private[sparkbindings] implicit def m2w(m: Matrix): MatrixWritable = new MatrixWritable(m)
