Repository: mahout Updated Branches: refs/heads/flink-binding 9831771b8 -> 9e9ec79d6
A small renaming of methods of DrmRddInput Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/9e9ec79d Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/9e9ec79d Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/9e9ec79d Branch: refs/heads/flink-binding Commit: 9e9ec79d6ceac7928e78be1dcf48c421c851b4bb Parents: 9831771 Author: Dmitriy Lyubimov <[email protected]> Authored: Sat Oct 24 20:26:56 2015 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Sat Oct 24 20:26:56 2015 -0700 ---------------------------------------------------------------------- .../apache/mahout/sparkbindings/SparkEngine.scala | 2 +- .../apache/mahout/sparkbindings/blas/ABt.scala | 8 ++++---- .../apache/mahout/sparkbindings/blas/AewB.scala | 12 ++++++------ .../mahout/sparkbindings/blas/AinCoreB.scala | 4 ++-- .../org/apache/mahout/sparkbindings/blas/At.scala | 2 +- .../apache/mahout/sparkbindings/blas/AtA.scala | 4 ++-- .../apache/mahout/sparkbindings/blas/AtB.scala | 12 ++++++------ .../org/apache/mahout/sparkbindings/blas/Ax.scala | 4 ++-- .../mahout/sparkbindings/blas/CbindAB.scala | 6 +++--- .../mahout/sparkbindings/blas/MapBlock.scala | 2 +- .../apache/mahout/sparkbindings/blas/Par.scala | 8 ++++---- .../mahout/sparkbindings/blas/RbindAB.scala | 8 ++++---- .../mahout/sparkbindings/blas/Slicing.scala | 2 +- .../sparkbindings/drm/CheckpointedDrmSpark.scala | 18 +++++++++--------- .../drm/CheckpointedDrmSparkOps.scala | 2 +- .../mahout/sparkbindings/drm/DrmRddInput.scala | 4 ++-- 16 files changed, 49 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 41e966b..fdbd950 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -104,7 +104,7 @@ object SparkEngine extends DistributedEngine { BlockReduceFunc): Matrix = { import drm._ - drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf) + drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf) } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala index 11e2bad..4e77739 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala @@ -73,9 +73,9 @@ object ABt { srcB: DrmRddInput[Int]): DrmRddInput[K] = { // Blockify everything. - val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol) + val blocksA = srcA.asBlockified(operator.A.ncol) - val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol) + val blocksB = srcB.asBlockified(operator.B.ncol) val prodNCol = operator.ncol val prodNRow = operator.nrow @@ -212,7 +212,7 @@ object ABt { srcB: DrmRddInput[Int]): DrmRddInput[K] = { // Blockify everything. - val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol) + val blocksA = srcA.asBlockified(operator.A.ncol) // Mark row-blocks with group id .mapPartitionsWithIndex((part, iter) => { @@ -232,7 +232,7 @@ object ABt { } }) - val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol) + val blocksB = srcB.asBlockified(operator.B.ncol) // Final product's geometry. We want to extract that into local variables since we want to use // them as closure attributes. http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 8a90398..8e3a19a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -67,8 +67,8 @@ object AewB { case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId)) } - val a = srcA.toDrmRdd() - val b = srcB.toDrmRdd() + val a = srcA.asRowWise() + val b = srcB.asRowWise() debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.") @@ -120,10 +120,10 @@ object AewB { // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) { - val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { - srcA.toBlockifiedDrmRdd(op.A.ncol) + srcA.asBlockified(op.A.ncol) } val rdd = aBlockRdd.map {case (keys, block) => @@ -169,10 +169,10 @@ object AewB { // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows) { - val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { - srcA.toBlockifiedDrmRdd(op.A.ncol) + srcA.asBlockified(op.A.ncol) } debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.") http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala index 5f9f84a..1894495 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala @@ -25,7 +25,7 @@ object AinCoreB { } private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { - val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) + val rddA = srcA.asBlockified(op.A.ncol) implicit val ctx:DistributedContext = rddA.context val dg = drmBroadcast(op.right.viewDiagonal()) @@ -41,7 +41,7 @@ object AinCoreB { private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { - val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) + val rddA = srcA.asBlockified(op.A.ncol) implicit val sc:DistributedContext = rddA.sparkContext debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.") http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala index 5789bd2..fa25b73 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala @@ -46,7 +46,7 @@ object At { debug("operator A'.") - val drmRdd = srcA.toBlockifiedDrmRdd(operator.A.ncol) + val drmRdd = srcA.asBlockified(operator.A.ncol) val numPartitions = drmRdd.partitions.size val ncol = operator.ncol http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala index a212878..50a4b19 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala @@ -50,7 +50,7 @@ object AtA { // If we can comfortably fit upper-triangular operator into a map memory, we will run slim // algorithm with upper-triangular accumulators in maps. - val inCoreA = at_a_slim(srcRdd = srcRdd.toDrmRdd(), operator = operator) + val inCoreA = at_a_slim(srcRdd = srcRdd.asRowWise(), operator = operator) val drmRdd = parallelizeInCore(inCoreA, numPartitions = 1)(sc = srcRdd.sparkContext) drmRdd @@ -58,7 +58,7 @@ object AtA { // Otherwise, we need to run a distributed, big version // new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator))) - at_a_nongraph_mmul(srcRdd = srcRdd.toBlockifiedDrmRdd(operator.A.ncol), op = operator) + at_a_nongraph_mmul(srcRdd = srcRdd.asBlockified(operator.A.ncol), op = operator) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala index 45705a9..f7ad575 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala @@ -51,8 +51,8 @@ object AtB { def atb_nograph[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A], zippable: Boolean = false): DrmRddInput[Int] = { - val rddA = srcA.toDrmRdd() - val rddB = srcB.toDrmRdd() + val rddA = srcA.asRowWise() + val rddB = srcB.asRowWise() val prodNCol = operator.ncol @@ -99,8 +99,8 @@ object AtB { val prodNRow = safeToNonNegInt(operator.nrow) val aNRow = safeToNonNegInt(operator.A.nrow) - val rddA = srcA.toDrmRdd() - val rddB = srcB.toDrmRdd() + val rddA = srcA.asRowWise() + val rddB = srcB.asRowWise() // Approximate number of final partitions. We take bigger partitions as our guide to number of // elements per partition. TODO: do it better. @@ -119,8 +119,8 @@ object AtB { debug("mmul-A'B - zip: are identically distributed, performing row-wise zip.") - val blockdRddA = srcA.toBlockifiedDrmRdd(operator.A.ncol) - val blockdRddB = srcB.toBlockifiedDrmRdd(operator.B.ncol) + val blockdRddA = srcA.asBlockified(operator.A.ncol) + val blockdRddB = srcB.asBlockified(operator.B.ncol) blockdRddA http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala index 629accd..9705838 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala @@ -15,7 +15,7 @@ object Ax { def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = { - val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) + val rddA = srcA.asBlockified(op.A.ncol) implicit val sc: DistributedContext = rddA.sparkContext val bcastX = drmBroadcast(op.x) @@ -30,7 +30,7 @@ object Ax { def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = { - val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol) + val rddA = srcA.asBlockified(op.A.ncol) implicit val dc:DistributedContext = rddA.sparkContext val bcastX = drmBroadcast(op.x) http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala index 4a379ec..395ba4c 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala @@ -33,7 +33,7 @@ object CbindAB { private val log = Logger.getLogger(CbindAB.getClass) def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = { - val srcRdd = srcA.toDrmRdd() + val srcRdd = srcA.asRowWise() val ncol = op.A.ncol val x = op.x @@ -62,8 +62,8 @@ object CbindAB { def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { - val a = srcA.toDrmRdd() - val b = srcB.toDrmRdd() + val a = srcA.asRowWise() + val b = srcB.asRowWise() val n = op.ncol val n1 = op.A.ncol val n2 = n - n1 http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala index 2933ddc..b6bc961 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala @@ -31,7 +31,7 @@ object MapBlock { // into closure. val bmf = operator.bmf val ncol = operator.ncol - val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => { + val rdd = src.asBlockified(operator.A.ncol).map(blockTuple => { val out = bmf(blockTuple) assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.") http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala index 0434a72..d31e2f9 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala @@ -19,7 +19,7 @@ object Par { val srcBlockified = src.isBlockified - val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd() + val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise() val srcNParts = srcRdd.partitions.size // To what size? @@ -36,7 +36,7 @@ object Par { if (targetParts > srcNParts) { // Expanding. Always requires deblockified stuff. May require re-shuffling. - val rdd = src.toDrmRdd().repartition(numPartitions = targetParts) + val rdd = src.asRowWise().repartition(numPartitions = targetParts) rdd @@ -44,9 +44,9 @@ object Par { // Shrinking. if (srcBlockified) { - drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts)) + drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts)) } else { - src.toDrmRdd().coalesce(numPartitions = targetParts) + src.asRowWise().coalesce(numPartitions = targetParts) } } else { // no adjustment required. http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala index 62abba6..5fccde2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala @@ -31,8 +31,8 @@ object RbindAB { // If any of the inputs is blockified, use blockified inputs if (srcA.isBlockified || srcB.isBlockified) { - val a = srcA.toBlockifiedDrmRdd(op.A.ncol) - val b = srcB.toBlockifiedDrmRdd(op.B.ncol) + val a = srcA.asBlockified(op.A.ncol) + val b = srcB.asBlockified(op.B.ncol) // Union seems to be fine, it is indeed just do partition-level unionization, no shuffles a ++ b @@ -40,8 +40,8 @@ object RbindAB { } else { // Otherwise, use row-wise inputs -- no reason to blockify here. - val a = srcA.toDrmRdd() - val b = srcB.toDrmRdd() + val a = srcA.asRowWise() + val b = srcB.asRowWise() a ++ b } http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala index 0284ba2..a100443 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala @@ -8,7 +8,7 @@ object Slicing { def rowRange(op: OpRowRange, srcA: DrmRddInput[Int]): DrmRddInput[Int] = { val rowRange = op.rowRange val ncol = op.ncol - val rdd = srcA.toDrmRdd() + val rdd = srcA.asRowWise() // Filter the rows in the range only .filter({ http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 797a5c2..38007e0 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -123,7 +123,7 @@ class CheckpointedDrmSpark[K: ClassTag]( // since currently spark #collect() requires Serializeable support, // we serialize DRM vectors into byte arrays on backend and restore Vector // instances on the front end: - val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect() + val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect() val m = if (data.forall(_._2.isDense)) @@ -159,13 +159,13 @@ class CheckpointedDrmSpark[K: ClassTag]( // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save. if (ktag.runtimeClass == classOf[Int]) { - rddInput.toDrmRdd() + rddInput.asRowWise() .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[String]){ - rddInput.toDrmRdd() + rddInput.asRowWise() .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[Long]) { - rddInput.toDrmRdd() + rddInput.asRowWise() .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) @@ -176,7 +176,7 @@ class CheckpointedDrmSpark[K: ClassTag]( val intRowIndex = classTag[K] == classTag[Int] if (intRowIndex) { - val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]] + val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]] // I guess it is a suitable place to compute int keys consistency test here because we know // that nrow can be computed lazily, which always happens when rdd is already available, cached, @@ -189,21 +189,21 @@ class CheckpointedDrmSpark[K: ClassTag]( intFixExtra = (maxPlus1 - rowCount) max 0L maxPlus1 } else - cache().rddInput.toDrmRdd().count() + cache().rddInput.asRowWise().count() } protected def computeNCol = { rddInput.isBlockified match { - case true â rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached")) + case true â rddInput.asBlockified(throw new AssertionError("not reached")) .map(_._2.ncol).reduce(max(_, _)) - case false â cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max(_, _)) + case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _)) } } protected def computeNNonZero = - cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong + cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong /** Changes the number of rows in the DRM without actually touching the underlying data. Used to * redimension a DRM after it has been created, which implies some blank, non-existent rows. http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala index abcfc64..25953e1 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala @@ -11,6 +11,6 @@ class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) { private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]] /** Spark matrix customization exposure */ - def rdd = sparkDrm.rddInput.toDrmRdd() + def rdd = sparkDrm.rddInput.asRowWise() } http://git-wip-us.apache.org/repos/asf/mahout/blob/9e9ec79d/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala index d9dbada..5c9319a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala @@ -31,10 +31,10 @@ class DrmRddInput[K: ClassTag](private val input: Either[DrmRdd[K], BlockifiedDr def isRowWise: Boolean = input.isLeft - def toDrmRdd(): DrmRdd[K] = input.left.getOrElse(deblockify(rdd = input.right.get)) + def asRowWise(): DrmRdd[K] = input.left.getOrElse(deblockify(rdd = input.right.get)) /** Use late binding for this. It may or may not be needed, depending on current config. */ - def toBlockifiedDrmRdd(ncol: â Int) = input.right.getOrElse(blockify(rdd = input.left.get, blockncol = ncol)) + def asBlockified(ncol: â Int) = input.right.getOrElse(blockify(rdd = input.left.get, blockncol = ncol)) def sparkContext: SparkContext = backingRdd.sparkContext
