http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 595cd66..41e966b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -24,51 +24,59 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput} +import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput} import org.apache.mahout.math._ +import scala.Predef import scala.reflect.ClassTag +import scala.reflect.classTag import org.apache.spark.storage.StorageLevel import org.apache.mahout.sparkbindings.blas._ import org.apache.hadoop.io._ -import scala.Some -import scala.collection.JavaConversions._ +import collection._ +import JavaConversions._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.spark.rdd.RDD import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil} + /** Spark-specific non-drm-method operations */ object SparkEngine extends DistributedEngine { // By default, use Hadoop 1 utils var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil - def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = { + def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol drm.rdd + // Throw away keys .map(_._2) + // Fold() doesn't work with kryo still. So work around it. - .mapPartitions(iter => { - val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v) + .mapPartitions(iter â { + val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) â acc += v) Iterator(acc) }) + // Since we preallocated new accumulator vector per partition, this must not cause any side // effects now. .reduce(_ += _) } - def numNonZeroElementsPerColumn[K:ClassTag](drm: CheckpointedDrm[K]): Vector = { + def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { val n = drm.ncol drm.rdd + // Throw away keys .map(_._2) + // Fold() doesn't work with kryo still. So work around it. - .mapPartitions(iter => { - val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) => - v.nonZeroes().foreach { elem => acc(elem.index) += 1 } + .mapPartitions(iter â { + val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) â + v.nonZeroes().foreach { elem â acc(elem.index) += 1} acc } Iterator(acc) @@ -79,17 +87,25 @@ object SparkEngine extends DistributedEngine { } /** Engine-specific colMeans implementation based on a checkpoint. */ - override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = + override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double = drm.rdd - // Compute sum of squares of each vector - .map { - case (key, v) => v dot v + // Compute sum of squares of each vector + .map { + case (key, v) â v dot v } - .reduce(_ + _) + .reduce(_ + _) + + /** Optional engine-specific all reduce tensor operation. */ + override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: + BlockReduceFunc): Matrix = { + + import drm._ + drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf) + } /** * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P> @@ -104,10 +120,10 @@ object SparkEngine extends DistributedEngine { def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { // Spark-specific Physical Plan translation. - val rdd = tr2phys(plan) + val rddInput = tr2phys(plan) val newcp = new CheckpointedDrmSpark( - rdd = rdd, + rddInput = rddInput, _nrow = plan.nrow, _ncol = plan.ncol, _cacheStorageLevel = cacheHint2Spark(ch), @@ -131,7 +147,13 @@ object SparkEngine extends DistributedEngine { * * @return DRM[Any] where Any is automatically translated to value type */ - def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { + def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { + + // Require that context is actually Spark context. + require(sc.isInstanceOf[SparkDistributedContext], "Supplied context must be for the Spark backend.") + + // Extract spark context -- we need it for some operations. + implicit val ssc = sc.asInstanceOf[SparkDistributedContext].sc val drmMetadata = hdfsUtils.readDrmHeader(path) val k2vFunc = drmMetadata.keyW2ValFunc @@ -140,8 +162,8 @@ object SparkEngine extends DistributedEngine { // Hadoop we must do it right after read operation). val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin) - // Immediately convert keys and value writables into value types. - .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()} + // Immediately convert keys and value writables into value types. + .map { case (wKey, wVec) â k2vFunc(wKey) -> wVec.get()} // Wrap into a DRM type with correct matrix row key class tag evident. drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]]) @@ -149,67 +171,141 @@ object SparkEngine extends DistributedEngine { /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext) + (implicit sc: DistributedContext) : CheckpointedDrm[Int] = { - new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions)) + new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol) } private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): DrmRdd[Int] = { + (implicit sc: DistributedContext): DrmRdd[Int] = { - val p = (0 until m.nrow).map(i => i -> m(i, ::)) + val p = (0 until m.nrow).map(i => i â m(i, ::)) sc.parallelize(p, numPartitions) } /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext) + (implicit sc: DistributedContext) : CheckpointedDrm[String] = { val rb = m.getRowLabelBindings - val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::) + val p = for (i: String â rb.keySet().toIndexedSeq) yield i â m(rb(i), ::) - new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions)) + new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol) } /** This creates an empty DRM with specified number of partitions and cardinality. */ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Int] = { - val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => { + (implicit sc: DistributedContext): CheckpointedDrm[Int] = { + val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part â { val partNRow = (nrow - 1) / numPartitions + 1 val partStart = partNRow * part val partEnd = Math.min(partStart + partNRow, nrow) - for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) + for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) }) new CheckpointedDrmSpark[Int](rdd, nrow, ncol) } def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Long] = { - val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => { + (implicit sc: DistributedContext): CheckpointedDrm[Long] = { + val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part â { val partNRow = (nrow - 1) / numPartitions + 1 val partStart = partNRow * part val partEnd = Math.min(partStart + partNRow, nrow) - for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) + for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) }) new CheckpointedDrmSpark[Long](rdd, nrow, ncol) } + /** + * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys + * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix. + */ + override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = { + if (classTag[K] == ClassTag.Int) { + drmX.asInstanceOf[DrmLike[Int]] â None + } else { + + val drmXcp = drmX.checkpoint(CacheHint.MEMORY_ONLY) + val ncol = drmXcp.asInstanceOf[CheckpointedDrmSpark[K]]._ncol + val nrow = drmXcp.asInstanceOf[CheckpointedDrmSpark[K]]._nrow + + // Compute sequential int key numbering. + val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = computeMap) + + // Convert computed key mapping to a matrix. + val mxKeyMap = keyMap.map { rdd => + drmWrap(rdd = rdd.map { case (key, ordinal) â key â (dvec(ordinal):Vector)}, ncol = 1, nrow = nrow) + } + + + drmWrap(rdd = intRdd, ncol = ncol) â mxKeyMap + } + + } + + + /** + * (Optional) Sampling operation. Consistent with Spark semantics of the same. + * @param drmX + * @param fraction + * @param replacement + * @tparam K + * @return + */ + override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = { + + // We do want to take ncol if already computed, if not, then we don't want to trigger computation + // here. + val ncol = drmX match { + case cp: CheckpointedDrmSpark[K] â cp._ncol + case _ â -1 + } + val sample = drmX.rdd.sample(withReplacement = replacement, fraction = fraction) + if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol) + + // K == Int: Int-keyed sample. rebase int counts. + drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol = ncol).asInstanceOf[DrmLike[K]] + } + + + override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = { + + val ncol = drmX match { + case cp: CheckpointedDrmSpark[K] â cp._ncol + case _ â -1 + } + + // I think as of the time of this writing, takeSample() in Spark is biased. It is not a true + // hypergeometric sampler. But it is faster than a true hypergeometric/categorical samplers + // would be. + val sample = drmX.rdd.takeSample(withReplacement = replacement, num = numSamples) + val isSparse = sample.exists { case (_, vec) â !vec.isDense } + + val vectors = sample.map(_._2) + val labels = sample.view.zipWithIndex.map { case ((key, _), idx) â key.toString â (idx:Integer) }.toMap + + val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors) + mx.setRowLabelBindings(labels) + + mx + } + private[mahout] def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match { - case CacheHint.NONE => StorageLevel.NONE - case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY - case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2 - case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY - case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2 - case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER - case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2 - case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK - case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2 - case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER - case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2 + case CacheHint.NONE â StorageLevel.NONE + case CacheHint.DISK_ONLY â StorageLevel.DISK_ONLY + case CacheHint.DISK_ONLY_2 â StorageLevel.DISK_ONLY_2 + case CacheHint.MEMORY_ONLY â StorageLevel.MEMORY_ONLY + case CacheHint.MEMORY_ONLY_2 â StorageLevel.MEMORY_ONLY_2 + case CacheHint.MEMORY_ONLY_SER â StorageLevel.MEMORY_ONLY_SER + case CacheHint.MEMORY_ONLY_SER_2 â StorageLevel.MEMORY_ONLY_SER_2 + case CacheHint.MEMORY_AND_DISK â StorageLevel.MEMORY_AND_DISK + case CacheHint.MEMORY_AND_DISK_2 â StorageLevel.MEMORY_AND_DISK_2 + case CacheHint.MEMORY_AND_DISK_SER â StorageLevel.MEMORY_AND_DISK_SER + case CacheHint.MEMORY_AND_DISK_SER_2 â StorageLevel.MEMORY_AND_DISK_SER_2 } /** Translate previously optimized physical plan */ @@ -221,31 +317,32 @@ object SparkEngine extends DistributedEngine { // If there are any such cases, they must go away in pass1. If they were not, then it wasn't // the A'A case but actual transposition intent which should be removed from consideration // (we cannot do actual flip for non-int-keyed arguments) - case OpAtAnyKey(_) => + case OpAtAnyKey(_) â throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.") - case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA)) - case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB), - zippable = a.partitioningTag == b.partitioningTag) - case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA)) - case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA)) - case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA)) - case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpRbind(a, b) => RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) - case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s) - case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA)) - case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA)) + case op@OpAt(a) â At.at(op, tr2phys(a)(op.classTagA)) + case op@OpABt(a, b) â ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) + case op@OpAtB(a, b) â AtB.atb(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) + case op@OpAtA(a) â AtA.at_a(op, tr2phys(a)(op.classTagA)) + case op@OpAx(a, x) â Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA)) + case op@OpAtx(a, x) â Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA)) + case op@OpAewUnaryFunc(a, _, _) â AewB.a_ew_func(op, tr2phys(a)(op.classTagA)) + case op@OpAewUnaryFuncFusion(a, _) â AewB.a_ew_func(op, tr2phys(a)(op.classTagA)) + case op@OpAewB(a, b, opId) â AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) + case op@OpCbind(a, b) â CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) + case op@OpCbindScalar(a, _, _) â CbindAB.cbindAScalar(op, tr2phys(a)(op.classTagA)) + case op@OpRbind(a, b) â RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) + case op@OpAewScalar(a, s, _) â AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s) + case op@OpRowRange(a, _) â Slicing.rowRange(op, tr2phys(a)(op.classTagA)) + case op@OpTimesRightMatrix(a, _) â AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA)) // Custom operators, we just execute them - case blockOp: OpMapBlock[K, _] => MapBlock.exec( + case blockOp: OpMapBlock[K, _] â MapBlock.exec( src = tr2phys(blockOp.A)(blockOp.classTagA), - ncol = blockOp.ncol, - bmf = blockOp.bmf + operator = blockOp ) - case op@OpPar(a,_,_) => Par.exec(op,tr2phys(a)(op.classTagA)) - case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd))) - case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." - .format(oper)) + case op@OpPar(a, _, _) â Par.exec(op, tr2phys(a)(op.classTagA)) + case cp: CheckpointedDrm[K] â cp.rdd: DrmRddInput[K] + case _ â throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." + .format(oper)) } }
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala index 1e3f286..11e2bad 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala @@ -19,16 +19,23 @@ package org.apache.mahout.sparkbindings.blas import org.apache.mahout.math.scalabindings._ import RLikeOps._ +import org.apache.spark.rdd.RDD import scala.reflect.ClassTag import org.apache.mahout.sparkbindings._ -import drm._ -import org.apache.mahout.math.{Matrix, SparseRowMatrix} +import org.apache.mahout.math.drm.BlockifiedDrmTuple +import org.apache.mahout.sparkbindings.drm._ +import org.apache.mahout.math.{SparseMatrix, Matrix, SparseRowMatrix} import org.apache.spark.SparkContext._ import org.apache.mahout.math.drm.logical.OpABt +import org.apache.mahout.logging._ + +import scala.tools.nsc.io.Pickler.TildeDecorator /** Contains RDD plans for ABt operator */ object ABt { + private final implicit val log = getLog(ABt.getClass) + /** * General entry point for AB' operator. * @@ -40,8 +47,11 @@ object ABt { def abt[K: ClassTag]( operator: OpABt[K], srcA: DrmRddInput[K], - srcB: DrmRddInput[Int]): DrmRddInput[K] = + srcB: DrmRddInput[Int]): DrmRddInput[K] = { + + debug("operator AB'(Spark)") abt_nograph(operator, srcA, srcB) + } /** * Computes AB' without GraphX. @@ -63,7 +73,146 @@ object ABt { srcB: DrmRddInput[Int]): DrmRddInput[K] = { // Blockify everything. - val blocksA = srcA.toBlockifiedDrmRdd() + val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol) + + val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol) + + val prodNCol = operator.ncol + val prodNRow = operator.nrow + // We are actually computing AB' here. + val numProductPartitions = estimateProductPartitions(anrow = prodNRow, ancol = operator.A.ncol, + bncol = prodNCol, aparts = blocksA.partitions.size, bparts = blocksB.partitions.size) + + debug( + s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}."+ + s"A=${operator.A.nrow}x${operator.A.ncol}, B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol." + ) + + // blockwise multimplication function + def mmulFunc(tupleA: BlockifiedDrmTuple[K], tupleB: BlockifiedDrmTuple[Int]): (Array[K], Array[Int], Matrix) = { + val (keysA, blockA) = tupleA + val (keysB, blockB) = tupleB + + var ms = traceDo(System.currentTimeMillis()) + + // We need to send keysB to the aggregator in order to know which columns are being updated. + val result = (keysA, keysB, (blockA %*% blockB.t)) + + ms = traceDo(System.currentTimeMillis() - ms.get) + trace( + s"block multiplication of(${blockA.nrow}x${blockA.ncol} x ${blockB.ncol}x${blockB.nrow} is completed in $ms " + + "ms.") + trace(s"block multiplication types: blockA: ${blockA.getClass.getName}(${blockA.t.getClass.getName}); " + + s"blockB: ${blockB.getClass.getName}.") + + result + } + + val blockwiseMmulRdd = + + // Combine blocks pairwise. + pairwiseApply(blocksA, blocksB, mmulFunc _) + + // Now reduce proper product blocks. + .combineByKey( + + // Empty combiner += value + createCombiner = (t: (Array[K], Array[Int], Matrix)) => { + val (rowKeys, colKeys, block) = t + val comb = new SparseMatrix(prodNCol, block.nrow).t + + for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i) + rowKeys -> comb + }, + + // Combiner += value + mergeValue = (comb: (Array[K], Matrix), value: (Array[K], Array[Int], Matrix)) => { + val (rowKeys, c) = comb + val (_, colKeys, block) = value + for ((col, i) <- colKeys.zipWithIndex) c(::, col) := block(::, i) + comb + }, + + // Combiner + Combiner + mergeCombiners = (comb1: (Array[K], Matrix), comb2: (Array[K], Matrix)) => { + comb1._2 += comb2._2 + comb1 + }, + + numPartitions = blocksA.partitions.size max blocksB.partitions.size + ) + + + // Created BlockifiedRDD-compatible structure. + val blockifiedRdd = blockwiseMmulRdd + + // throw away A-partition # + .map{case (_,tuple) => tuple} + + val numPartsResult = blockifiedRdd.partitions.size + + // See if we need to rebalance away from A granularity. + if (numPartsResult * 2 < numProductPartitions || numPartsResult / 2 > numProductPartitions) { + + debug(s"Will re-coalesce from ${numPartsResult} to ${numProductPartitions}") + + val rowRdd = deblockify(blockifiedRdd).coalesce(numPartitions = numProductPartitions) + + rowRdd + + } else { + + // We don't have a terribly different partition + blockifiedRdd + } + + } + + /** + * This function tries to use join instead of cartesian to group blocks together without bloating + * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away + * so if the data to one of the join parts is streaming, the result is still fitting to memory, + * since result size is much smaller than the operands. + * + * @param blocksA blockified RDD for A + * @param blocksB blockified RDD for B + * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be + * switched to another scheme based on which of the sides, A or B, is bigger. + */ + private def pairwiseApply[K1, K2, T](blocksA: BlockifiedDrmRdd[K1], blocksB: BlockifiedDrmRdd[K2], blockFunc: + (BlockifiedDrmTuple[K1], BlockifiedDrmTuple[K2]) => T): RDD[(Int, T)] = { + + // We will be joining blocks in B to blocks in A using A-partition as a key. + + // Prepare A side. + val blocksAKeyed = blocksA.mapPartitionsWithIndex { (part, blockIter) => + + val r = if (blockIter.hasNext) Some(part -> blockIter.next) else Option.empty[(Int, BlockifiedDrmTuple[K1])] + + require(blockIter.hasNext == false, s"more than 1 (${blockIter.size + 1}) blocks per partition and A of AB'") + + r.toIterator + } + + // Prepare B-side. + val aParts = blocksA.partitions.size + val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple ) + + // Perform the inner join. Let's try to do a simple thing now. + blocksAKeyed.join(blocksBKeyed, numPartitions = aParts) + + // Apply product function which should produce smaller products. Hopefully, this streams blockB's in + .map{case (partKey,(blockA, blockB)) => partKey -> blockFunc(blockA, blockB)} + + } + + private[blas] def abt_nograph_cart[K: ClassTag]( + operator: OpABt[K], + srcA: DrmRddInput[K], + srcB: DrmRddInput[Int]): DrmRddInput[K] = { + + // Blockify everything. + val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol) // Mark row-blocks with group id .mapPartitionsWithIndex((part, iter) => { @@ -83,28 +232,35 @@ object ABt { } }) - val blocksB = srcB.toBlockifiedDrmRdd() + val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol) // Final product's geometry. We want to extract that into local variables since we want to use // them as closure attributes. val prodNCol = operator.ncol val prodNRow = operator.nrow - - // Approximate number of final partitions. - val numProductPartitions = - if (blocksA.partitions.size > blocksB.partitions.size) { - ((prodNCol.toDouble / operator.A.ncol) * blocksA.partitions.size).ceil.toInt - } else { - ((prodNRow.toDouble / operator.B.ncol) * blocksB.partitions.size).ceil.toInt - } + val aNCol = operator.A.ncol + + // Approximate number of final partitions. We take bigger partitions as our guide to number of + // elements per partition. TODO: do it better. - //srcA.partitions.size.max(that = srcB.partitions.size) + // Elements per partition, bigger of two operands. + val epp = aNCol.toDouble * prodNRow / blocksA.partitions.size max aNCol.toDouble * prodNCol / + blocksB.partitions.size + // Number of partitions we want to converge to in the product. For now we simply extrapolate that + // assuming product density and operand densities being about the same; and using the same element + // per partition number in the product as the bigger of two operands. + val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt + + debug( + s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}.") // The plan. - var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA + var blockifiedRdd: BlockifiedDrmRdd[K] = blocksA - // Build Cartesian. It may require a bit more memory there at tasks. + // Build Cartesian. It generates a LOT of tasks. TODO: figure how to fix performance of AB' + // operator. The thing is that product after map is really small one (partition fraction x + // partition fraction) so they can be combined into much bigger chunks. .cartesian(blocksB) // Multiply blocks @@ -126,10 +282,14 @@ object ABt { .combineByKey[(Array[K],Matrix)]( createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match { + + // Create combiner structure out of two products. Our combiner is sparse row matrix + // initialized to final product partition block dimensions. case (rowKeys, colKeys, blockProd) => - // Accumulator is a row-wise block of sparse vectors. - val acc:Matrix = new SparseRowMatrix(rowKeys.size, prodNCol) + // Accumulator is a row-wise block of sparse vectors. Since we assign to columns, + // the most efficient is perhaps to create column-oriented block here. + val acc:Matrix = new SparseRowMatrix(prodNCol, rowKeys.size).t // Update accumulator using colKeys as column index indirection colKeys.view.zipWithIndex.foreach({ @@ -168,6 +328,8 @@ object ABt { // having at most one block per partition. blockifiedRdd = rbind(blockifiedRdd) - new DrmRddInput(blockifiedSrc = Some(blockifiedRdd)) + blockifiedRdd } + + } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 3cdb797..8a90398 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -20,19 +20,22 @@ package org.apache.mahout.sparkbindings.blas import org.apache.mahout.sparkbindings.drm.DrmRddInput import scala.reflect.ClassTag import org.apache.spark.SparkContext._ -import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math._ +import scalabindings._ import RLikeOps._ import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector} -import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB} -import org.apache.log4j.Logger +import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, OpAewScalar, OpAewB} import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc} import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm} import org.apache.mahout.math.drm._ +import org.apache.mahout.logging._ +import collection._ +import JavaConversions._ /** Elementwise drm-drm operators */ object AewB { - private val log = Logger.getLogger(AewB.getClass) + private final implicit val log = getLog(AewB.getClass) /** * Set to false to disallow in-place elementwise operations in case side-effects and non-idempotent @@ -44,10 +47,10 @@ object AewB { type ReduceFuncScalar = (Matrix, Double) => Matrix - private[blas] def getEWOps() = { - val inplaceProp = System.getProperty(PROPERTY_AEWB_INPLACE, "true").toBoolean - if (inplaceProp) InplaceEWOps else CloningEWOps - } + private[blas] def ewInplace(): Boolean = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean + + private[blas] def getEWOps() = if (ewInplace()) InplaceEWOps else CloningEWOps + /** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */ def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { @@ -67,12 +70,14 @@ object AewB { val a = srcA.toDrmRdd() val b = srcB.toDrmRdd() + debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.") + // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip // instead of join, and apply the op map-side. Otherwise, perform join and apply the op // reduce-side. val rdd = if (op.isIdenticallyPartitioned(op.A)) { - log.debug("applying zipped elementwise") + debug(s"A${op.op}B:applying zipped elementwise") a .zip(b) @@ -83,7 +88,7 @@ object AewB { } } else { - log.debug("applying elementwise as join") + debug("A${op.op}B:applying elementwise as join") a // Full outer-join operands row-wise @@ -103,13 +108,51 @@ object AewB { }) } - new DrmRddInput(rowWiseSrc = Some(ncol -> rdd)) + rdd + } + + def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = { + + val evalZeros = op.evalZeros + val inplace = ewInplace() + val f = op.f + + // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing + // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. + val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) { + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] + } else { + srcA.toBlockifiedDrmRdd(op.A.ncol) + } + + val rdd = aBlockRdd.map {case (keys, block) => + + // Do inplace or allocate a new copy? + val newBlock = if (inplace) block else block cloned + + // Operation cares about zeros? + if (evalZeros) { + + // Yes, we evaluate all: + newBlock := ((_,_,x)=>f(x)) + } else { + + // No, evaluate non-zeros only row-wise + for (row <- newBlock; el <- row.nonZeroes) el := f(el.get) + } + + keys -> newBlock + } + + rdd } /** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */ def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double): DrmRddInput[K] = { + val ewOps = getEWOps() val opId = op.op @@ -129,15 +172,17 @@ object AewB { val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { - srcA.toBlockifiedDrmRdd() + srcA.toBlockifiedDrmRdd(op.A.ncol) } + debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.") + val rdd = aBlockRdd - .map({ + .map { case (keys, block) => keys -> reduceFunc(block, scalar) - }) + } - new DrmRddInput[K](blockifiedSrc = Some(rdd)) + rdd } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala index c923e62..5f9f84a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala @@ -6,13 +6,17 @@ import scalabindings._ import RLikeOps._ import org.apache.mahout.sparkbindings._ import org.apache.mahout.sparkbindings.drm._ +import org.apache.mahout.logging._ import scala.reflect.ClassTag import org.apache.mahout.math.DiagonalMatrix import org.apache.mahout.math.drm.logical.OpTimesRightMatrix + /** Matrix product with one of operands an in-core matrix */ object AinCoreB { + private final implicit val log = getLog(AinCoreB.getClass) + def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { if ( op.right.isInstanceOf[DiagonalMatrix]) rightMultiply_diag(op, srcA) @@ -21,23 +25,27 @@ object AinCoreB { } private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { - val rddA = srcA.toBlockifiedDrmRdd() + val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) implicit val ctx:DistributedContext = rddA.context val dg = drmBroadcast(op.right.viewDiagonal()) + debug(s"operator A %*% inCoreB-diagonal. #parts=${rddA.partitions.size}.") + val rdd = rddA // Just multiply the blocks .map { case (keys, blockA) => keys -> (blockA %*%: diagv(dg)) } - new DrmRddInput(blockifiedSrc = Some(rdd)) + rdd } private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { - val rddA = srcA.toBlockifiedDrmRdd() + val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) implicit val sc:DistributedContext = rddA.sparkContext + debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.") + val bcastB = drmBroadcast(m = op.right) val rdd = rddA @@ -46,7 +54,7 @@ object AinCoreB { case (keys, blockA) => keys -> (blockA %*% bcastB) } - new DrmRddInput(blockifiedSrc = Some(rdd)) + rdd } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala index 56de9f4..5789bd2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala @@ -17,16 +17,20 @@ package org.apache.mahout.sparkbindings.blas -import org.apache.mahout.sparkbindings.drm.DrmRddInput +import org.apache.mahout.sparkbindings.drm._ import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.logging._ import RLikeOps._ import org.apache.spark.SparkContext._ import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector} import org.apache.mahout.math.drm.logical.OpAt + /** A' algorithms */ object At { + private final implicit val log = getLog(At.getClass) + def at( operator: OpAt, srcA: DrmRddInput[Int]): DrmRddInput[Int] = at_nograph(operator = operator, srcA = srcA) @@ -39,10 +43,15 @@ object At { * groups into final rows of the transposed matrix. */ private[blas] def at_nograph(operator: OpAt, srcA: DrmRddInput[Int]): DrmRddInput[Int] = { - val drmRdd = srcA.toBlockifiedDrmRdd() + + debug("operator A'.") + + val drmRdd = srcA.toBlockifiedDrmRdd(operator.A.ncol) val numPartitions = drmRdd.partitions.size val ncol = operator.ncol + debug(s"A' #parts = $numPartitions.") + // Validity of this conversion must be checked at logical operator level. val nrow = operator.nrow.toInt val atRdd = drmRdd @@ -70,7 +79,7 @@ object At { key -> v }).densify() - new DrmRddInput(rowWiseSrc = Some(ncol -> atRdd)) + atRdd } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala index be4f08c..a212878 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala @@ -17,6 +17,7 @@ package org.apache.mahout.sparkbindings.blas +import org.apache.mahout.logging._ import org.apache.mahout.math._ import org.apache.mahout.sparkbindings._ import org.apache.mahout.sparkbindings.drm._ @@ -34,25 +35,30 @@ import SparkEngine._ */ object AtA { - final val log = Logger.getLogger(AtA.getClass) + private final implicit val log = getLog(AtA.getClass) final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol" + final val PROPERTY_ATA_MMUL_BLOCKHEIGHT = "mahout.math.AtA.blockHeight" /** Materialize A'A operator */ def at_a(operator: OpAtA[_], srcRdd: DrmRddInput[_]): DrmRddInput[Int] = { - val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "2000").toInt + val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "200").toInt maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer") if (operator.ncol <= maxInMemNCol) { + // If we can comfortably fit upper-triangular operator into a map memory, we will run slim // algorithm with upper-triangular accumulators in maps. - val inCoreA = at_a_slim(srcRdd = srcRdd, operator = operator) + val inCoreA = at_a_slim(srcRdd = srcRdd.toDrmRdd(), operator = operator) val drmRdd = parallelizeInCore(inCoreA, numPartitions = 1)(sc = srcRdd.sparkContext) - new DrmRddInput(rowWiseSrc = Some(inCoreA.ncol, drmRdd)) + drmRdd + } else { + // Otherwise, we need to run a distributed, big version - new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator))) + // new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator))) + at_a_nongraph_mmul(srcRdd = srcRdd.toBlockifiedDrmRdd(operator.A.ncol), op = operator) } } @@ -64,7 +70,7 @@ object AtA { */ def at_a_slim(operator: OpAtA[_], srcRdd: DrmRdd[_]): Matrix = { - log.debug("Applying slim A'A.") + debug("operator slim A'A(Spark)") val ncol = operator.ncol // Compute backing vector of tiny-upper-triangular accumulator accross all the data. @@ -73,122 +79,195 @@ object AtA { val ut = new UpperTriangular(ncol) // Strategy is to add to an outer product of each row to the upper triangular accumulator. - pIter.foreach({ - case (k, v) => + pIter.foreach({ case (k, v) => - // Use slightly various traversal strategies over dense vs. sparse source. - if (v.isDense) { + // Use slightly various traversal strategies over dense vs. sparse source. + if (v.isDense) { - // Update upper-triangular pattern only (due to symmetry). - // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is - // such spectacular case they were deesigned for.. Yes I do observe some 20% difference - // compared to while loops with no other payload, but the other payload is usually much - // heavier than this overhead, so... I am keeping this as is for the time being. + // Update upper-triangular pattern only (due to symmetry). + // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is + // such spectacular case they were deesigned for.. Yes I do observe some 20% difference + // compared to while loops with no other payload, but the other payload is usually much + // heavier than this overhead, so... I am keeping this as is for the time being. - for (row <- 0 until v.length; col <- row until v.length) - ut(row, col) = ut(row, col) + v(row) * v(col) + for (row <- 0 until v.length; col <- row until v.length) + ut(row, col) = ut(row, col) + v(row) * v(col) - } else { + } else { - // Sparse source. - v.nonZeroes().view + // Sparse source. + v.nonZeroes().view - // Outer iterator iterates over rows of outer product. - .foreach(elrow => { + // Outer iterator iterates over rows of outer product. + .foreach(elrow => { - // Inner loop for columns of outer product. - v.nonZeroes().view + // Inner loop for columns of outer product. + v.nonZeroes().view - // Filter out non-upper nonzero elements from the double loop. - .filter(_.index >= elrow.index) + // Filter out non-upper nonzero elements from the double loop. + .filter(_.index >= elrow.index) - // Incrementally update outer product value in the uppper triangular accumulator. - .foreach(elcol => { + // Incrementally update outer product value in the uppper triangular accumulator. + .foreach(elcol => { - val row = elrow.index - val col = elcol.index - ut(row, col) = ut(row, col) + elrow.get() * elcol.get() + val row = elrow.index + val col = elcol.index + ut(row, col) = ut(row, col) + elrow.get() * elcol.get() - }) }) + }) - } + } }) Iterator(dvec(ddata = ut.getData): Vector) - }) - - .collect() - .reduce(_ += _) + }).collect().reduce(_ += _) new DenseSymmetricMatrix(resSym) } + // Version that tries to use groupBy. In practice this is the slowest. + def at_a_group(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRddInput[Int] = { + debug("operator non-slim A'A(Spark-group).") + + // Determine how many partitions the new matrix would need approximately. We base that on + // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more + // dense in reality than the source. + val m = op.A.nrow + val n = op.A.ncol + val srcNumParts = srcRdd.partitions.size + val finalNumParts = (srcNumParts * n / m).ceil.toInt max 1 + val numParts = finalNumParts max srcNumParts + val ranges = computeEvenSplits(n, numParts) + + var rddAtA = srcRdd + + // Remove key, key is irrelevant + .map(_._2) + + // Form partial outer blocks for each partition + .flatMap { v => + for (blockKey <- 0 until numParts) yield { + blockKey -> v + } + } + // Sent to individual partition reducer + .groupByKey(numPartitions = numParts) + + // Reduce individual group + .map { case (blockKey, iter) => + val range = ranges(blockKey) + val mxC: Matrix = new SparseRowMatrix(range.size, n, false) + iter.foreach(vec => addOuterProduct(mxC, vec(range), vec)) + + // Fix keys + val blockStart = range.start + val rowKeys = Array.tabulate(mxC.nrow)(blockStart + _) + rowKeys -> mxC + } + + if (log.isDebugEnabled) + log.debug(s"AtA (grouping) #parts: ${rddAtA.partitions.size}.") + + if (finalNumParts < numParts) rddAtA = rddAtA.coalesce(finalNumParts, shuffle = false) + + rddAtA + } + + /** The version of A'A that does not use GraphX */ - def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRdd[Int] = { + def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRddInput[Int] = { - log.debug("Applying non-slim non-graph A'A.") + debug("Applying non-slim non-graph A'A.") // Determine how many partitions the new matrix would need approximately. We base that on // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more // dense in reality than the source. - val m = op.A.nrow val n = op.A.ncol -/* possible fix for index out of range for vector range - val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1 + val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.toInt max 1 val blockHeight = (n - 1) / numParts + 1 -*/ - val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1 min n + val offsets = (0 until numParts).map(_ * blockHeight) + val ranges = offsets.map(offset => offset until (offset + blockHeight min n)) - // Computing evenly split ranges to denote each partition size. + val rddAtA = srcRdd - // Base size. - val baseSize = n / numParts + // Remove key, key is irrelevant + .map(_._2) - // How many partitions needs to be baseSize +1. - val slack = n - baseSize * numParts + // Form partial outer blocks for each partition + .flatMap { v => + for (blockKey <- 0 until numParts) yield { + blockKey ->(blockKey, v) + } + } + // Combine outer products + .combineByKey(// Combiner factory + createCombiner = (t: (Int, Vector)) => { + val partNo = t._1 + val vec = t._2 + val range = ranges(partNo) + val mxC = if (vec.isDense) new DenseMatrix(range.size, n) else new SparseRowMatrix(range.size, n) + addOuterProduct(mxC, vec(range), vec) + }, + + // Merge values into existing partition accumulator. + mergeValue = (mxC: Matrix, t: (Int, Vector)) => { + val partNo = t._1 + val vec = t._2 + addOuterProduct(mxC, vec(ranges(partNo)), vec) + }, + + // Merge combiners + mergeCombiners = (mxC1: Matrix, mxC2: Matrix) => mxC1 += mxC2, numPartitions = numParts) + + // Restore proper block keys + .map { case (blockKey, block) => + val blockStart = blockKey * blockHeight + val rowKeys = Array.tabulate(block.nrow)(blockStart + _) + rowKeys -> block + } - val ranges = - // Start with partition offsets... total numParts + 1. - (0 to numParts).view.map { i => (baseSize + 1) * i - (i - slack max 0)} - // And convert offsets to ranges. - .sliding(2).map(s => s(0) until s(1)).toIndexedSeq + if (log.isDebugEnabled) + log.debug(s"AtA #parts: ${rddAtA.partitions.size}.") - val rddAtA = srcRdd + rddAtA + } - // Remove key, key is irrelevant - .map(_._2) - - // Form partial outer blocks for each partition - .flatMap { - v => - for (blockKey <- Stream.range(0, numParts)) yield { -/* patch to fix index out of range for vector access - val blockStart = blockKey * blockHeight - val blockEnd = n min (blockStart + blockHeight) - blockKey -> (v(blockStart until blockEnd) cross v) -*/ - val range = ranges(blockKey) - blockKey -> (v(range) cross v) - } + /** + * The version of A'A that does not use GraphX. Tries to use blockwise matrix multiply. If an + * accelerated matrix back is available, this might be somewhat faster. + */ + def at_a_nongraph_mmul(op: OpAtA[_], srcRdd: BlockifiedDrmRdd[_]): DrmRddInput[Int] = { + + // Determine how many partitions the new matrix would need approximately. We base that on + // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more + // dense in reality than the source. + val m = op.A.nrow + val n = op.A.ncol + val aparts = srcRdd.partitions.size + val numParts = estimateProductPartitions(anrow = n, ancol = m, bncol = n, aparts = aparts, bparts = aparts) + val ranges = computeEvenSplits(n, numParts) + + debug(s"operator mmul-A'A(Spark); #parts = $numParts, #partsA=$aparts.") + + val rddAtA = srcRdd.flatMap { case (keys, block) => + Iterator.tabulate(numParts) { i => + i -> block(::, ranges(i)).t %*% block + } } - // Combine outer blocks - .reduceByKey(_ += _) - - // Restore proper block keys - .map { - case (blockKey, block) => -/* patch to fix index out of range for vector access - val blockStart = blockKey * blockHeight - val rowKeys = Array.tabulate(block.nrow)(blockStart + _) -*/ - val range = ranges(blockKey) - val rowKeys = Array.tabulate(block.nrow)(range.start + _) - rowKeys -> block + // Reduce partial blocks. + .reduceByKey(_ += _, numPartitions = numParts) + + // Produce keys + .map { case (blockKey, block) => + + val blockStart = ranges(blockKey).start + val rowKeys = Array.tabulate(block.nrow)(blockStart + _) + rowKeys -> block } - new DrmRddInput[Int](blockifiedSrc = Some(rddAtA)) + rddAtA } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala index 86aadc8..45705a9 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala @@ -17,8 +17,13 @@ package org.apache.mahout.sparkbindings.blas -import scala.reflect.ClassTag -import org.apache.mahout.math.drm._ +import reflect.ClassTag +import collection._ +import JavaConversions._ + +import org.apache.mahout.logging._ +import org.apache.mahout.math._ +import drm._ import org.apache.mahout.sparkbindings.drm._ import org.apache.spark.rdd.RDD import org.apache.mahout.math.scalabindings._ @@ -27,92 +32,330 @@ import org.apache.spark.SparkContext._ import org.apache.log4j.Logger import org.apache.mahout.math.drm.logical.OpAtB +import scala.collection.mutable.ArrayBuffer + object AtB { - private val log = Logger.getLogger(AtB.getClass) + private final implicit val log = getLog(AtB.getClass) + def atb[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A]): DrmRddInput[Int] = { + atb_nograph_mmul(operator, srcA, srcB, operator.A.partitioningTag == operator.B.partitioningTag) + } /** * The logic for computing A'B is pretty much map-side generation of partial outer product blocks * over co-grouped rows of A and B. If A and B are identically partitioned, we can just directly * zip all the rows. Otherwise, we need to inner-join them first. + * */ - def atb_nograph[A: ClassTag]( - operator: OpAtB[A], - srcA: DrmRddInput[A], - srcB: DrmRddInput[A], - zippable:Boolean = false - ): DrmRddInput[Int] = { + @deprecated("slow, will remove", since = "0.10.2") + def atb_nograph[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A], + zippable: Boolean = false): DrmRddInput[Int] = { val rddA = srcA.toDrmRdd() - val zipped = if ( zippable ) { + val rddB = srcB.toDrmRdd() + + + val prodNCol = operator.ncol + val prodNRow = operator.nrow + val aNRow = operator.A.nrow + + // Approximate number of final partitions. We take bigger partitions as our guide to number of + // elements per partition. TODO: do it better. + // Elements per partition, bigger of two operands. + val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol / + rddB.partitions.size + + // Number of partitions we want to converge to in the product. For now we simply extrapolate that + // assuming product density and operand densities being about the same; and using the same element + // per partition number in the product as the bigger of two operands. + val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt + + if (log.isDebugEnabled) log.debug(s"AtB: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.") + + val zipped = if (zippable) { log.debug("A and B for A'B are identically distributed, performing row-wise zip.") - rddA.zip(other = srcB.toDrmRdd()) + rddA.zip(other = rddB) } else { log.debug("A and B for A'B are not identically partitioned, performing inner join.") - rddA.join(other=srcB.toDrmRdd()).map({ - case (key,(v1,v2) ) => (key -> v1) -> (key -> v2) + rddA.join(other = rddB, numPartitions = numProductPartitions).map({ case (key, (v1, + v2)) => (key -> v1) -> (key -> v2) }) } - val blockHeight = safeToNonNegInt( - (operator.B.ncol.toDouble/rddA.partitions.size).ceil.round max 1L - ) - - computeAtBZipped( - zipped, - nrow = operator.nrow, - ancol = operator.A.ncol, - bncol = operator.B.ncol, - blockHeight = blockHeight - ) + computeAtBZipped2(zipped, nrow = operator.nrow, ancol = operator.A.ncol, bncol = operator.B.ncol, + numPartitions = numProductPartitions) + } + + private[sparkbindings] def atb_nograph_mmul[A:ClassTag](operator:OpAtB[A], srcA: DrmRddInput[A], srcB:DrmRddInput[A], zippable:Boolean = false):DrmRddInput[Int] = { + + debug("operator mmul-A'B(Spark)") + + val prodNCol = operator.ncol + val prodNRow = safeToNonNegInt(operator.nrow) + val aNRow = safeToNonNegInt(operator.A.nrow) + + val rddA = srcA.toDrmRdd() + val rddB = srcB.toDrmRdd() + + // Approximate number of final partitions. We take bigger partitions as our guide to number of + // elements per partition. TODO: do it better. + // Elements per partition, bigger of two operands. + val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol / + rddB.partitions.size + + // Number of partitions we want to converge to in the product. For now we simply extrapolate that + // assuming product density and operand densities being about the same; and using the same element + // per partition number in the product as the bigger of two operands. + val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt min prodNRow + + if (log.isDebugEnabled) log.debug(s"AtB mmul: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.") + + val zipped = if (zippable) { + + debug("mmul-A'B - zip: are identically distributed, performing row-wise zip.") + + val blockdRddA = srcA.toBlockifiedDrmRdd(operator.A.ncol) + val blockdRddB = srcB.toBlockifiedDrmRdd(operator.B.ncol) + + blockdRddA + + // Zip + .zip(other = blockdRddB) + + // Throw away the keys + .map { case ((_, blockA), (_, blockB)) => blockA -> blockB} + + } else { + + debug("mmul-A'B: cogroup for non-identically distributed stuff.") + + // To take same route, we'll join stuff row-wise, blockify it here and then proceed with the + // same computation path. Although it is possible we could shave off one shuffle here. TBD. + + rddA + + // Do full join. We can't get away with partial join because it is going to lose some rows + // in case we have missing rows on either side. + .cogroup(other = rddB, numPartitions = rddA.partitions.size max rddB.partitions.size ) + + + // Merge groups. + .mapPartitions{iter => + + val aRows = new ArrayBuffer[Vector](1000) + val bRows = new ArrayBuffer[Vector](1000) + + // Populate hanging row buffs + iter.foreach{case (_, (arowbag,browbag)) => + + // Some up all vectors, if any, for a row. If we have > 1 that means original matrix had + // non-uniquely keyed rows which is generally a matrix format inconsistency (should not + // happen). + aRows += (if (arowbag.isEmpty) + new SequentialAccessSparseVector(prodNRow) + else arowbag.reduce(_ += _)) + + bRows += (if (browbag.isEmpty) + new SequentialAccessSparseVector(prodNCol) + else browbag.reduce(_ += _)) + } + + // Transform collection of vectors into blocks. + val blockNRow = aRows.size + assert(blockNRow == bRows.size) + + val aBlock:Matrix = new SparseRowMatrix(blockNRow, prodNRow, aRows.toArray) + val bBlock:Matrix = new SparseRowMatrix(blockNRow, prodNCol, bRows.toArray) + + // Form pairwise result + Iterator(aBlock -> bBlock) + } + } + + computeAtBZipped3(pairwiseRdd = zipped, nrow = prodNRow, ancol = prodNRow, bncol = aNRow, + numPartitions = numProductPartitions) + + } + /** + * Compute, combine and accumulate outer products for every key. The incoming tuple structure + * is (partNo, (vecA, vecB)), so for every `partNo` we compute an outer product of the form {{{ + * vecA cross vecB + * }}} + * @param pairwiseRdd + * @return + */ + @deprecated("slow, will remove", since = "0.10.2") + private[sparkbindings] def combineOuterProducts(pairwiseRdd: RDD[(Int, (Vector, Vector))], numPartitions: Int) = { + + pairwiseRdd + + // Reduce individual partitions + .combineByKey(createCombiner = (t: (Vector, Vector)) => { + + val vecA = t._1 + val vecB = t._2 + + // Create partition accumulator. Generally, summation of outer products probably calls for + // dense accumulators. However, let's assume extremely sparse cases are still possible, and + // by default assume any sparse case is an extremely sparse case. May need to tweak further. + val mxC: Matrix = if (!vecA.isDense && !vecB.isDense) + new SparseRowMatrix(vecA.length, vecB.length) + else + new DenseMatrix(vecA.length, vecB.length) + + // Add outer product of arow and bRowFrag to mxC + addOuterProduct(mxC, vecA, vecB) + + }, mergeValue = (mxC: Matrix, t: (Vector, Vector)) => { + // Merge of a combiner with another outer product fragment. + val vecA = t._1 + val vecB = t._2 + + addOuterProduct(mxC, vecA, vecB) + + }, mergeCombiners = (mxC1: Matrix, mxC2: Matrix) => { + + // Merge of 2 combiners. + mxC1 += mxC2 + + }, numPartitions = numPartitions) + } + + private[sparkbindings] def computeAtBZipped3[A: ClassTag](pairwiseRdd: RDD[(Matrix, Matrix)], nrow: Int, + ancol: Int, bncol: Int, numPartitions: Int) = { + + val ranges = computeEvenSplits(nrow, numPartitions) + + val rdd = pairwiseRdd.flatMap{ case (blockA, blockB) â + + // Handling microscopic Pat's cases. Any slicing doesn't work well on 0-row matrix. This + // probably should be fixed in the in-core matrix implementations. + if (blockA.nrow == 0 ) + Iterator.empty + else + // Output each partial outer product with its correspondent partition #. + Iterator.tabulate(numPartitions) {part â + + val mBlock = blockA(::, ranges(part)).t %*% blockB + part â mBlock + } + } + + // Reduce. + .reduceByKey(_ += _, numPartitions = numPartitions) + + // Produce keys + .map { case (blockKey, block) â ranges(blockKey).toArray â block } + + debug(s"A'B mmul #parts: ${rdd.partitions.size}.") + + rdd } + private[sparkbindings] def computeAtBZipped2[A: ClassTag](zipped: RDD[(DrmTuple[A], DrmTuple[A])], nrow: Long, + ancol: Int, bncol: Int, numPartitions: Int) = { + + // The plan of this approach is to send a_i and parts of b_i to partitoin reducers which actually + // do outer product sum update locally (instead of sending outer blocks). Thus it should minimize + // expense for IO and also in-place partition block accum update should be much more efficient + // than forming outer block matrices and perform matrix-on-patrix +. + // Figure out appriximately block height per partition of the result. + val blockHeight = safeToNonNegInt((nrow - 1) / numPartitions) + 1 -// private[sparkbindings] def atb_nograph() + val partitionedRdd = zipped + + // Split B-rows into partitions using blockHeight + .mapPartitions { iter => + + val offsets = (0 until numPartitions).map(_ * blockHeight) + val ranges = offsets.map(offs => offs until (offs + blockHeight min ancol)) + + // Transform into series of (part -> (arow, part-brow)) tuples (keyed by part #). + iter.flatMap { case ((_, arow), (_, brow)) => + + ranges.view.zipWithIndex.map { case (arange, partNum) => + partNum -> (arow(arange).cloned -> brow) + } + } + } + + val blockRdd = combineOuterProducts(partitionedRdd, numPartitions) + + // Add ordinal row keys. + .map { case (blockNum, block) => + + // Starting key + var offset = blockNum * blockHeight + + var keys = Array.tabulate(block.nrow)(offset + _) + keys -> block + + } + + blockRdd + } /** Given already zipped, joined rdd of rows of A' and B, compute their product A'B */ - private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped:RDD[(DrmTuple[A], DrmTuple[A])], - nrow:Long, ancol:Int, bncol:Int, blockHeight: Int) = { + @deprecated("slow, will remove", since = "0.10.2") + private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped: RDD[(DrmTuple[A], DrmTuple[A])], nrow: Long, + ancol: Int, bncol: Int, numPartitions: Int) = { // Since Q and A are partitioned same way,we can just zip their rows and proceed from there by // forming outer products. Our optimizer lacks this primitive, so we will implement it using RDDs // directly. We try to compile B' = A'Q now by collecting outer products of rows of A and Q. At // this point we need to split n-range of B' into sutiable number of partitions. - val btNumParts = safeToNonNegInt((nrow - 1) / blockHeight + 1) + if (log.isDebugEnabled) { + log.debug(s"AtBZipped:zipped #parts ${zipped.partitions.size}") + log.debug(s"AtBZipped:Targeted #parts ${numPartitions}") + } + + // Figure out appriximately block height per partition of the result. + val blockHeight = safeToNonNegInt((nrow - 1) / numPartitions) + 1 val rddBt = zipped - // Produce outer product blocks - .flatMap { - case ((aKey, aRow), (qKey, qRow)) => - for (blockKey <- Stream.range(0, btNumParts)) yield { - val blockStart = blockKey * blockHeight - val blockEnd = ancol min (blockStart + blockHeight) + // Produce outer product blocks + .flatMap { case ((aKey, aRow), (qKey, qRow)) => + for (blockKey <- Stream.range(0, numPartitions)) yield { + val blockStart = blockKey * blockHeight + val blockEnd = ancol min (blockStart + blockHeight) - // Create block by cross product of proper slice of aRow and qRow - blockKey -> (aRow(blockStart until blockEnd) cross qRow) - } - } - // Combine blocks by just summing them up - .reduceByKey { - case (block1, block2) => block1 += block2 + // Create block by cross product of proper slice of aRow and qRow + blockKey -> (aRow(blockStart until blockEnd) cross qRow) + + // TODO: computing tons of cross product matrices seems to be pretty inefficient here. More + // likely single streaming algorithm of updates will perform much better here. So rewrite + // this using mapPartitions with numPartitions block accumulators. + + } } + // .combineByKey( + // createCombiner = (mx:Matrix) => mx, + // mergeValue = (c:Matrix,mx:Matrix) => c += mx, + // mergeCombiners = (c1:Matrix,c2:Matrix) => c1 += c2, + // numPartitions = numPartitions + // ) + // Doesn't look like use of combineByKey produces any better results than reduceByKey. So keeping + // reduceByKey for simplicity. Combiners probably doesn't mean reduceByKey doesn't combine map-side. + // Combine blocks by just summing them up + .reduceByKey((block1, block2) => block1 += block2, numPartitions) - // Throw away block key, generate row keys instead. - .map { - case (blockKey, block) => - val blockStart = blockKey * blockHeight - val rowKeys = Array.tabulate(block.nrow)(blockStart + _) - rowKeys -> block + // Throw away block key, generate row keys instead. + .map { case (blockKey, block) => + val blockStart = blockKey * blockHeight + val rowKeys = Array.tabulate(block.nrow)(blockStart + _) + rowKeys -> block } - new DrmRddInput[Int](blockifiedSrc = Some(rddBt)) + if (log.isDebugEnabled) log.debug(s"AtBZipped #parts ${rddBt.partitions.size}") + + rddBt } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala index 94c3f06..629accd 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala @@ -15,22 +15,22 @@ object Ax { def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { - val rddA = srcA.toBlockifiedDrmRdd() - implicit val sc:DistributedContext = rddA.sparkContext + val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) + implicit val sc: DistributedContext = rddA.sparkContext val bcastX = drmBroadcast(op.x) - val rdd = rddA - // Just multiply the blocks - .map({ - case (keys, blockA) => keys -> (blockA %*% bcastX).toColMatrix - }) + val rdd: BlockifiedDrmRdd[K] = rddA + + // Just multiply the blocks + .map { case (keys, blockA) â keys â (blockA %*% bcastX).toColMatrix } - new DrmRddInput(blockifiedSrc = Some(rdd)) + new DrmRddInput(Right(rdd)) } def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = { - val rddA = srcA.toBlockifiedDrmRdd() + + val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) implicit val dc:DistributedContext = rddA.sparkContext val bcastX = drmBroadcast(op.x) @@ -52,10 +52,10 @@ object Ax { // It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug // it back as drm blockified rdd - val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1) - .map(block => Array.tabulate(block.nrow)(i => i) -> block) + val rdd:BlockifiedDrmRdd[Int] = dc.parallelize(Seq(inCoreM), numSlices = 1) + .map{block â Array.tabulate(block.nrow)(i â i) -> block} - new DrmRddInput(blockifiedSrc = Some(rdd)) + rdd } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala index ea10ccb..4a379ec 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala @@ -18,12 +18,13 @@ package org.apache.mahout.sparkbindings.blas import org.apache.log4j.Logger -import scala.reflect.ClassTag +import org.apache.mahout.sparkbindings.DrmRdd +import reflect._ import org.apache.mahout.sparkbindings.drm.DrmRddInput import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ -import org.apache.mahout.math.drm.logical.OpCbind +import org.apache.mahout.math.drm.logical.{OpCbindScalar, OpCbind} import org.apache.spark.SparkContext._ /** Physical cbind */ @@ -31,6 +32,34 @@ object CbindAB { private val log = Logger.getLogger(CbindAB.getClass) + def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = { + val srcRdd = srcA.toDrmRdd() + + val ncol = op.A.ncol + val x = op.x + + val fixedRdd = if (classTag[K] == ClassTag.Int && x != 0.0) + fixIntConsistency(op.asInstanceOf[OpCbindScalar[Int]], + src = srcRdd.asInstanceOf[DrmRdd[Int]]).asInstanceOf[DrmRdd[K]] + else srcRdd + + val left = op.leftBind + + val resultRdd = fixedRdd.map { case (key, vec) => + val newVec = vec.like(ncol + 1) + if (left) { + newVec(1 to ncol) := vec + newVec(0) = x + } else { + newVec(0 until ncol) := vec + newVec(ncol) = x + } + key -> newVec + } + + resultRdd + } + def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { val a = srcA.toDrmRdd() @@ -88,7 +117,7 @@ object CbindAB { } } - new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd)) + rdd } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala index a3caac7..4cd9a74 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala @@ -25,12 +25,14 @@ import org.apache.mahout.sparkbindings.DrmRdd class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) { + /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */ def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({ case (key, v) => val vd = if (!v.isDense && v.getNumNonZeroElements > threshold * v.length) new DenseVector(v) else v key -> vd }) + /** Turn rdd into sparse RDD if density threshold is underrun. */ def sparsify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({ case (key, v) => val vs = if (v.isDense() && v.getNumNonZeroElements <= threshold * v.length) http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala index 4c68c9a..2933ddc 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala @@ -17,6 +17,7 @@ package org.apache.mahout.sparkbindings.blas +import org.apache.mahout.math.drm.logical.OpMapBlock import org.apache.mahout.sparkbindings.drm.DrmRddInput import org.apache.mahout.math.drm.BlockMapFunc import org.apache.mahout.math.scalabindings.RLikeOps._ @@ -24,12 +25,13 @@ import scala.reflect.ClassTag object MapBlock { - def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = { + def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = { - // We can't use attributes to avoid putting the whole this into closure. - - val rdd = src.toBlockifiedDrmRdd() - .map(blockTuple => { + // We can't use attributes directly in the closure in order to avoid putting the whole object + // into closure. + val bmf = operator.bmf + val ncol = operator.ncol + val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => { val out = bmf(blockTuple) assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.") @@ -37,7 +39,8 @@ object MapBlock { out }) - new DrmRddInput(blockifiedSrc = Some(rdd)) + + rdd } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala index e73376d..0434a72 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala @@ -1,50 +1,58 @@ package org.apache.mahout.sparkbindings.blas +import org.apache.mahout.sparkbindings.drm + import scala.reflect.ClassTag import org.apache.mahout.sparkbindings.drm.DrmRddInput import org.apache.mahout.math.drm.logical.OpPar import org.apache.spark.rdd.RDD +import scala.math._ + +import org.apache.mahout.logging._ /** Physical adjustment of parallelism */ object Par { + private final implicit val log = getLog(Par.getClass) + def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = { - def adjust[T](rdd: RDD[T]): RDD[T] = - if (op.minSplits > 0) { - if (rdd.partitions.size < op.minSplits) - rdd.coalesce(op.minSplits, shuffle = true) - else rdd.coalesce(rdd.partitions.size) - } else if (op.exactSplits > 0) { - if (op.exactSplits < rdd.partitions.size) - rdd.coalesce(numPartitions = op.exactSplits, shuffle = false) - else if (op.exactSplits > rdd.partitions.size) - rdd.coalesce(numPartitions = op.exactSplits, shuffle = true) - else - rdd.coalesce(rdd.partitions.size) - } else if (op.exactSplits == -1 && op.minSplits == -1) { - - // auto adjustment, try to scale up to either x1Size or x2Size. - val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt - - val x1Size = (clusterSize * .95).ceil.toInt - val x2Size = (clusterSize * 1.9).ceil.toInt - - if (rdd.partitions.size <= x1Size) - rdd.coalesce(numPartitions = x1Size, shuffle = true) - else if (rdd.partitions.size <= x2Size) - rdd.coalesce(numPartitions = x2Size, shuffle = true) - else - rdd.coalesce(numPartitions = rdd.partitions.size) - } else rdd.coalesce(rdd.partitions.size) - - if (src.isBlockified) { - val rdd = src.toBlockifiedDrmRdd() - new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd))) + val srcBlockified = src.isBlockified + + val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd() + val srcNParts = srcRdd.partitions.size + + // To what size? + val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits + else if (op.exactSplits > 0) op.exactSplits + else /* auto adjustment */ { + val stdParallelism = srcRdd.context.getConf.get("spark.default.parallelism", "1").toInt + val x1 = 0.95 * stdParallelism + if (srcNParts <= ceil(x1)) ceil(x1).toInt else ceil(2 * x1).toInt + } + + debug(s"par $srcNParts => $targetParts.") + + if (targetParts > srcNParts) { + + // Expanding. Always requires deblockified stuff. May require re-shuffling. + val rdd = src.toDrmRdd().repartition(numPartitions = targetParts) + + rdd + + } else if (targetParts < srcNParts) { + // Shrinking. + + if (srcBlockified) { + drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts)) + } else { + src.toDrmRdd().coalesce(numPartitions = targetParts) + } } else { - val rdd = src.toDrmRdd() - new DrmRddInput[K](rowWiseSrc = Some(op.ncol -> adjust(rdd))) + // no adjustment required. + src } + } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala index 5037d68..62abba6 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala @@ -31,11 +31,11 @@ object RbindAB { // If any of the inputs is blockified, use blockified inputs if (srcA.isBlockified || srcB.isBlockified) { - val a = srcA.toBlockifiedDrmRdd() - val b = srcB.toBlockifiedDrmRdd() + val a = srcA.toBlockifiedDrmRdd(op.A.ncol) + val b = srcB.toBlockifiedDrmRdd(op.B.ncol) // Union seems to be fine, it is indeed just do partition-level unionization, no shuffles - new DrmRddInput(blockifiedSrc = Some(a ++ b)) + a ++ b } else { @@ -43,7 +43,7 @@ object RbindAB { val a = srcA.toDrmRdd() val b = srcB.toDrmRdd() - new DrmRddInput(rowWiseSrc = Some(op.ncol -> (a ++ b))) + a ++ b } } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala index d0a50b5..0284ba2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala @@ -22,6 +22,6 @@ object Slicing { // TODO: we probably need to re-shuffle result or at least cut down the partitions of 0 size - new DrmRddInput(rowWiseSrc = Some(ncol -> rdd)) + rdd } } http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 9a50afa..6b8513f 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -17,13 +17,17 @@ package org.apache.mahout.sparkbindings +import org.apache.mahout.sparkbindings +import org.apache.spark.rdd.RDD + import scala.reflect.ClassTag -import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput} import org.apache.spark.SparkContext._ import org.apache.mahout.math._ import org.apache.mahout.math.drm._ import scalabindings._ import RLikeOps._ +import collection._ +import JavaConversions._ /** * This validation contains distributed algorithms that distributed matrix expression optimizer picks @@ -31,8 +35,81 @@ import RLikeOps._ */ package object blas { - implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd) + implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd) + + + /** + * Rekey matrix dataset keys to consequtive int keys. + * @param rdd incoming matrix row-wise dataset + * + * @param computeMap if true, also compute mapping between old and new keys + * @tparam K existing key parameter + * @return + */ + private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int], + Option[RDD[(K, Int)]]) = { + + // Spark context please. + val sctx = rdd.context + import sctx._ + + // First, compute partition sizes. + val partSizes = rdd.mapPartitionsWithIndex((part, iter) => Iterator(part -> iter.size)) + + // Collect in-core + .collect() + + // Starting indices + var startInd = new Array[Int](rdd.partitions.size) + + // Save counts + for (pc <- partSizes) startInd(pc._1) = pc._2 + + // compute cumulative sum + val siBcast = broadcast(startInd.scanLeft(0)(_ + _).init) + + // Compute key -> int index map: + val keyMap = if (computeMap) { + Some(rdd + + // Process individual partition with index, output `key -> index` tuple + .mapPartitionsWithIndex((part, iter) => { + + // Start index for this partition + val si = siBcast.value(part) + iter.zipWithIndex.map { case ((key, _), index) => key -> (index + si)} + })) // Some + + } else { + + // Were not asked to compute key mapping + None + } + + // Finally, do the transform + val intRdd = rdd + + // Re-number each partition + .mapPartitionsWithIndex((part, iter) => { + // Start index for this partition + val si = siBcast.value(part) + + // Iterate over data by producing sequential row index and retaining vector value. + iter.zipWithIndex.map { case ((_, vec), ind) => si + ind -> vec} + }) + + // Finally, return drm -> keymap result + + intRdd -> keyMap + + } + + + /** + * Fills in missing rows in an Int-indexed matrix by putting in empty row vectors for the missing + * keys. + */ private[mahout] def fixIntConsistency(op: DrmLike[Int], src: DrmRdd[Int]): DrmRdd[Int] = { if (op.canHaveMissingRows) { @@ -45,20 +122,20 @@ package object blas { // Compute the fix. sc - // Bootstrap full key set - .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1) + // Bootstrap full key set + .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1) - // Enable PairedFunctions - .map(_ -> Unit) + // Enable PairedFunctions + .map(_ -> Unit) - // Cogroup with all rows - .cogroup(other = rdd) + // Cogroup with all rows + .cogroup(other = rdd) - // Filter out out-of-bounds - .filter { case (key, _) => key >= 0 && key < dueRows} + // Filter out out-of-bounds + .filter { case (key, _) => key >= 0 && key < dueRows} - // Coalesce and output RHS - .map { case (key, (seqUnit, seqVec)) => + // Coalesce and output RHS + .map { case (key, (seqUnit, seqVec)) => val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc key -> vec @@ -68,4 +145,77 @@ package object blas { } + /** Method to do `mxC += a cross b` in-plcae a bit more efficiently than this expression does. */ + def addOuterProduct(mxC: Matrix, a: Vector, b: Vector): Matrix = { + + // Try to pay attention to density a bit here when computing and adding the outer product of + // arow and brow fragment. + if (b.isDense) + for (ela <- a.nonZeroes) mxC(ela.index, ::) := { (i, x) => x + ela * b(i)} + else + for (ela <- a.nonZeroes; elb <- b.nonZeroes()) mxC(ela.index, elb.index) += ela * elb + + mxC + } + + /** + * Compute ranges of more or less even splits of total `nrow` number + * + * @param nrow + * @param numSplits + * @return + */ + @inline + private[blas] def computeEvenSplits(nrow: Long, numSplits: Int): IndexedSeq[Range] = { + require(numSplits <= nrow, "Requested amount of splits greater than number of data points.") + require(nrow >= 1) + require(numSplits >= 1) + + // Base split -- what is our base split size? + val baseSplit = safeToNonNegInt(nrow / numSplits) + + // Slack -- how many splits will have to be incremented by 1 though? + val slack = safeToNonNegInt(nrow % numSplits) + + // Compute ranges. We need to set ranges so that numSplits - slack splits have size of baseSplit; + // and `slack` splits have size baseSplit + 1. Here is how we do it: First, we compute the range + // offsets: + val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack)) + // And then we connect the ranges using gaps between offsets: + offsets.sliding(2).map(offs => offs(0) until offs(1)).toIndexedSeq + } + + /** + * Estimate number of partitions for the product of A %*% B. + * + * We take average per-partition element count of product as higher of the same of A and B. (prefer + * larger partitions of operands). + * + * @param anrow A.nrow + * @param ancol A.ncol + * @param bncol B.ncol + * @param aparts partitions in A + * @param bparts partitions in B + * @return recommended partitions + */ + private[blas] def estimateProductPartitions(anrow:Long, ancol:Long, bncol:Long, aparts:Int, bparts:Int):Int = { + + // Compute per-partition element density in A + val eppA = anrow.toDouble * ancol/ aparts + + // Compute per-partition element density in B + val eppB = ancol.toDouble * bncol / bparts + + // Take the maximum element density into account. Is it a good enough? + val epp = eppA max eppB + + // product partitions + val prodParts = anrow * bncol / epp + + val nparts = math.round(prodParts).toInt max 1 + + // Constrain nparts to maximum of anrow to prevent guaranteed empty partitions. + if (nparts > anrow) anrow.toInt else nparts + } + }
