Repository: mahout Updated Branches: refs/heads/master 9bfb76732 -> 63cebf76e
MAHOUT-1583: cbind() operator for Scala DRMs This closes apache/mahout#20 Squashed commit of the following: commit 1baf3fcebf2cd49eb6487afe4eaed689653a3562 Merge: 41f0671 9bfb767 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jul 7 12:32:34 2014 -0700 Merge branch 'master' into MAHOUT-1583 commit 41f0671c9ccca81883475437fe18056563c1f8ac Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jul 7 12:27:26 2014 -0700 Adding assignment time comparison test commit 6e115c87639ffb111b2e089d7ecdf0eaeecbb02b Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 16 18:42:42 2014 -0700 + Licenses. commit d3e905b5a92735822efe717f8b7a57c80bc56478 Author: Dmitriy Lyubimov <[email protected]> Date: Mon Jun 16 18:09:59 2014 -0700 initial writeup Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/63cebf76 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/63cebf76 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/63cebf76 Branch: refs/heads/master Commit: 63cebf76e9c7746a1851841e37f5873704e086e8 Parents: 9bfb767 Author: Dmitriy Lyubimov <[email protected]> Authored: Mon Jul 7 12:34:12 2014 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Mon Jul 7 12:34:12 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/mahout/math/drm/RLikeDrmOps.scala | 2 + .../mahout/math/drm/logical/OpCbind.scala | 42 +++++++++ .../math/scalabindings/MatrixOpsSuite.scala | 51 ++++++++++- .../mahout/sparkbindings/SparkEngine.scala | 1 + .../mahout/sparkbindings/blas/CbindAB.scala | 95 ++++++++++++++++++++ .../sparkbindings/drm/RLikeDrmOpsSuite.scala | 70 ++++++++++----- 7 files changed, 237 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 23ad49e..7ea84c5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,8 @@ Mahout Change Log Release 1.0 - unreleased + MAHOUT-1583: cbind() operator for Scala DRMs + MAHOUT-1541, MAHOUT-1568, MAHOUT-1569: Created text-delimited file I/O traits and classes on spark, a MahoutDriver for a CLI and a ItemSimilairtyDriver using the CLI MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management (dlyubimov) http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala index 7ac5577..d7027f2 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala @@ -60,6 +60,8 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends DrmLikeOps[K](drm) { def %*%(that: Vector): DrmLike[K] = :%*%(that) def t: DrmLike[Int] = OpAtAnyKey(A = drm) + + def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that) } class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) { http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala new file mode 100644 index 0000000..1425264 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.math.drm.logical + +import scala.reflect.ClassTag +import org.apache.mahout.math.drm.DrmLike +import scala.util.Random + +/** cbind() logical operator */ +case class OpCbind[K: ClassTag]( + override var A: DrmLike[K], + override var B: DrmLike[K] + ) extends AbstractBinaryOp[K, K, K] { + + assert(A.nrow == B.nrow, "arguments must have same number of rows") + + override protected[mahout] lazy val partitioningTag: Long = + if (A.partitioningTag == B.partitioningTag) A.partitioningTag + else Random.nextLong() + + /** R-like syntax for number of rows. */ + def nrow: Long = A.nrow + + /** R-like syntax for number of columns */ + def ncol: Int = A.ncol + B.ncol + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala index 8374a9b..c6fccbc 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/scalabindings/MatrixOpsSuite.scala @@ -21,19 +21,20 @@ import org.scalatest.{Matchers, FunSuite} import MatrixOps._ import scala._ import org.apache.mahout.test.MahoutSuite +import org.apache.mahout.math.{RandomAccessSparseVector, SequentialAccessSparseVector, Matrices} +import org.apache.mahout.common.RandomUtils class MatrixOpsSuite extends FunSuite with MahoutSuite { - test("equivalence") { val a = dense((1, 2, 3), (3, 4, 5)) val b = dense((1, 2, 3), (3, 4, 5)) val c = dense((1, 4, 3), (3, 4, 5)) assert(a === b) assert(a !== c) - } + test("elementwise plus, minus") { val a = dense((1, 2, 3), (3, 4, 5)) val b = dense((1, 1, 2), (2, 1, 1)) @@ -42,7 +43,6 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { assert(c(0, 0) == 2) assert(c(1, 2) == 6) println(c.toString) - } test("matrix, vector slicing") { @@ -132,7 +132,52 @@ class MatrixOpsSuite extends FunSuite with MahoutSuite { ) a.numNonZeroElementsPerColumn() should equal(dvec(3,2,4)) + } + + test("Vector Assignment performance") { + + val n = 1000 + val k = (n * 0.1).toInt + val nIters = 10000 + + val rnd = RandomUtils.getRandom + + val src = new SequentialAccessSparseVector(n) + for (i <- 0 until k) src(rnd.nextInt(n)) = rnd.nextDouble() + + val times = (0 until 50).map { i => + val ms = System.currentTimeMillis() + var j = 0 + while (j < nIters) { + new SequentialAccessSparseVector(n) := src + j += 1 + } + System.currentTimeMillis() - ms + } + + .tail + + val avgTime = times.sum.toDouble / times.size + + printf("Average assignment seqSparse2seqSparse time: %.3f ms\n", avgTime) + + val times2 = (0 until 50).map { i => + val ms = System.currentTimeMillis() + var j = 0 + while (j < nIters) { + new SequentialAccessSparseVector(n) := (new RandomAccessSparseVector(n) := src) + j += 1 + } + System.currentTimeMillis() - ms + } + + .tail + + val avgTime2 = times2.sum.toDouble / times2.size + + printf("Average assignment seqSparse2seqSparse via Random Access Sparse time: %.3f ms\n", avgTime2) } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 996eb1b..dbdc934 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -252,6 +252,7 @@ object SparkEngine extends DistributedEngine { case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA)) case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA)) case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) + case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB)) case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s) case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA)) case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA)) http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala new file mode 100644 index 0000000..ea10ccb --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.sparkbindings.blas + +import org.apache.log4j.Logger +import scala.reflect.ClassTag +import org.apache.mahout.sparkbindings.drm.DrmRddInput +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import org.apache.mahout.math.drm.logical.OpCbind +import org.apache.spark.SparkContext._ + +/** Physical cbind */ +object CbindAB { + + private val log = Logger.getLogger(CbindAB.getClass) + + def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { + + val a = srcA.toDrmRdd() + val b = srcB.toDrmRdd() + val n = op.ncol + val n1 = op.A.ncol + val n2 = n - n1 + + // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip + // instead of join, and apply the op map-side. Otherwise, perform join and apply the op + // reduce-side. + val rdd = if (op.isIdenticallyPartitioned(op.A)) { + + log.debug("applying zipped cbind()") + + a + .zip(b) + .map { + case ((keyA, vectorA), (keyB, vectorB)) => + assert(keyA == keyB, "inputs are claimed identically partitioned, but they are not identically keyed") + + val dense = vectorA.isDense && vectorB.isDense + val vec: Vector = if (dense) new DenseVector(n) else new SequentialAccessSparseVector(n) + vec(0 until n1) := vectorA + vec(n1 until n) := vectorB + keyA -> vec + } + } else { + + log.debug("applying cbind as join") + + a + .cogroup(b, numPartitions = a.partitions.size max b.partitions.size) + .map { + case (key, (vectorSeqA, vectorSeqB)) => + + // Generally, after co-grouping, we should not accept anything but 1 to 1 in the left and + // the right groups. However let's be flexible here, if it does happen, recombine them into 1. + + val vectorA = if (vectorSeqA.size <= 1) + vectorSeqA.headOption.getOrElse(new RandomAccessSparseVector(n1)) + else + (vectorSeqA.head.like() /: vectorSeqA)(_ += _) + + val vectorB = if ( vectorSeqB.size <= 1) + vectorSeqB.headOption.getOrElse(new RandomAccessSparseVector(n2)) + else + (vectorSeqB.head.like() /: vectorSeqB)(_ += _) + + val dense = vectorA.isDense && vectorB.isDense + val vec:Vector = if (dense) new DenseVector(n) else new SequentialAccessSparseVector(n) + vec(0 until n1) := vectorA + vec(n1 until n) := vectorB + key -> vec + } + } + + new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd)) + + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/63cebf76/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala index 3cd49cd..50f8978 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/RLikeDrmOpsSuite.scala @@ -94,12 +94,12 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { B.colSums() - val x = drmBroadcast(dvec(0,0)) - val x2 = drmBroadcast(dvec(0,0)) + val x = drmBroadcast(dvec(0, 0)) + val x2 = drmBroadcast(dvec(0, 0)) // Distributed operation val C = (B.t %*% A.t).t.mapBlock() { case (keys, block) => - for (row <- 0 until block.nrow) block(row,::) += x.value + x2 + for (row <- 0 until block.nrow) block(row, ::) += x.value + x2 keys -> block } @@ -133,7 +133,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val B = drmParallelize(inCoreB, numPartitions = 2) // Re-key B into DrmLike[String] instead of [Int] .mapBlock()({ - case (keys,block) => keys.map(_.toString) -> block + case (keys, block) => keys.map(_.toString) -> block }) val C = A %*% B @@ -146,7 +146,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { test("C = At %*% B , join") { - val inCoreA = dense((1, 2), (3, 4),(-3, -5)) + val inCoreA = dense((1, 2), (3, 4), (-3, -5)) val inCoreB = dense((3, 5), (4, 6), (0, 1)) val A = drmParallelize(inCoreA, numPartitions = 2) @@ -154,7 +154,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val C = A.t %*% B - SparkEngine.optimizerRewrite(C) should equal (OpAtB[Int](A,B)) + SparkEngine.optimizerRewrite(C) should equal(OpAtB[Int](A, B)) val inCoreC = C.collect val inCoreControlC = inCoreA.t %*% inCoreB @@ -165,7 +165,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { test("C = At %*% B , join, String-keyed") { - val inCoreA = dense((1, 2), (3, 4),(-3, -5)) + val inCoreA = dense((1, 2), (3, 4), (-3, -5)) val inCoreB = dense((3, 5), (4, 6), (0, 1)) val A = drmParallelize(inCoreA, numPartitions = 2) @@ -180,7 +180,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val C = A.t %*% B - SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B)) + SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B)) val inCoreC = C.collect val inCoreControlC = inCoreA.t %*% inCoreB @@ -191,18 +191,18 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { test("C = At %*% B , zippable, String-keyed") { - val inCoreA = dense((1, 2), (3, 4),(-3, -5)) + val inCoreA = dense((1, 2), (3, 4), (-3, -5)) val A = drmParallelize(inCoreA, numPartitions = 2) .mapBlock()({ case (keys, block) => keys.map(_.toString) -> block }) - val B = A + 1.0 + val B = A + 1.0 val C = A.t %*% B - SparkEngine.optimizerRewrite(C) should equal (OpAtB[String](A,B)) + SparkEngine.optimizerRewrite(C) should equal(OpAtB[String](A, B)) val inCoreC = C.collect val inCoreControlC = inCoreA.t %*% (inCoreA + 1.0) @@ -319,12 +319,12 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val A = drmParallelize(inCoreA, numPartitions = 2) - printf("A.nrow=%d.\n",A.rdd.count()) + printf("A.nrow=%d.\n", A.rdd.count()) // Create B which would be identically partitioned to A. mapBlock() by default will do the trick. val B = A.mapBlock() { case (keys, block) => - val bBlock = block.like() := ((r,c,v) => util.Random.nextDouble()) + val bBlock = block.like() := ((r, c, v) => util.Random.nextDouble()) keys -> bBlock } // Prevent repeated computation non-determinism @@ -394,7 +394,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val inCoreB = dense((3, 5), (4, 6)) val B = drmParallelize(inCoreB, numPartitions = 2) -// val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER) + // val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY_SER) val A = (drmParallelize(inCoreA, numPartitions = 2) + B).checkpoint(CacheHint.MEMORY_ONLY) val C = A + B @@ -403,17 +403,17 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val inCoreD = (A + B).collect // Actual - val inCoreCControl = inCoreA + inCoreB * 2.0 + val inCoreCControl = inCoreA + inCoreB * 2.0 (inCoreC - inCoreCControl).norm should be < 1E-10 (inCoreD - inCoreCControl).norm should be < 1E-10 } - test ("general side") { + test("general side") { val sc = implicitly[DistributedContext] - val k1 = sc.parallelize(Seq(ArrayBuffer(0,1,2,3))) -// .persist(StorageLevel.MEMORY_ONLY) // -- this will demonstrate immutability side effect! - .persist(StorageLevel.MEMORY_ONLY_SER) + val k1 = sc.parallelize(Seq(ArrayBuffer(0, 1, 2, 3))) + // .persist(StorageLevel.MEMORY_ONLY) // -- this will demonstrate immutability side effect! + .persist(StorageLevel.MEMORY_ONLY_SER) println(k1.map(_ += 4).collect.head) println(k1.map(_ += 4).collect.head) @@ -444,7 +444,7 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { val drmA = drmParallelize(inCoreA, numPartitions = 2) - SparkEngine.optimizerRewrite(drmA.t %*% x) should equal (OpAtx(drmA, x)) + SparkEngine.optimizerRewrite(drmA.t %*% x) should equal(OpAtx(drmA, x)) val atx = (drmA.t %*% x).collect(::, 0) @@ -459,8 +459,8 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { ) val drmA = drmParallelize(inCoreA, numPartitions = 2) - drmA.colSums() should equal (inCoreA.colSums()) - drmA.colMeans() should equal (inCoreA.colMeans()) + drmA.colSums() should equal(inCoreA.colSums()) + drmA.colMeans() should equal(inCoreA.colMeans()) } test("numNonZeroElementsPerColumn") { @@ -472,7 +472,31 @@ class RLikeDrmOpsSuite extends FunSuite with Matchers with MahoutLocalContext { ) val drmA = drmParallelize(inCoreA, numPartitions = 2) - drmA.numNonZeroElementsPerColumn() should equal (inCoreA.numNonZeroElementsPerColumn()) + drmA.numNonZeroElementsPerColumn() should equal(inCoreA.numNonZeroElementsPerColumn()) + } + + test("C = A cbind B, cogroup") { + + val inCoreA = dense((1, 2), (3, 4)) + val inCoreB = dense((3, 5), (4, 6)) + val controlC = dense((1, 2, 3, 5), (3, 4, 4, 6)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint() + + (A.cbind(B) -: controlC).norm should be < 1e-10 + + } + + test("C = A cbind B, zip") { + + val inCoreA = dense((1, 2), (3, 4)) + val controlC = dense((1, 2, 2, 3), (3, 4, 4, 5)) + + val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint() + + (A.cbind(A + 1.0) -: controlC).norm should be < 1e-10 + } }
