Repository: mahout
Updated Branches:
refs/heads/master 2715ed201 -> 846307d5b
MAHOUT-1596: implement rbind() operator
This closes apache/mahout#30
Conflicts:
math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/846307d5
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/846307d5
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/846307d5
Branch: refs/heads/master
Commit: 846307d5b03ef7df6d3c890f2a291b4c11581e80
Parents: 2715ed2
Author: Dmitriy Lyubimov <[email protected]>
Authored: Tue Jul 29 13:17:15 2014 -0700
Committer: Dmitriy Lyubimov <[email protected]>
Committed: Tue Jul 29 13:17:15 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../mahout/math/drm/DistributedEngine.scala | 18 +++++++
.../apache/mahout/math/drm/RLikeDrmOps.scala | 2 +
.../mahout/math/drm/logical/OpRbind.scala | 40 ++++++++++++++++
.../mahout/math/drm/RLikeDrmOpsSuiteBase.scala | 23 +++++++++
.../mahout/sparkbindings/SparkEngine.scala | 1 +
.../mahout/sparkbindings/blas/RbindAB.scala | 49 ++++++++++++++++++++
7 files changed, 135 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 50909e2..538b12b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1596: implement rbind() operator (Anand Avati and dlyubimov)
+
MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if
rdd is missing rows, Spark side (dlyubimov)
MAHOUT-1595: MatrixVectorView - implement a proper iterateNonZero() (Anand
Avati via dlyubimov)
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 03471fd..d89cc53 100644
---
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -22,6 +22,7 @@ import logical._
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._
+import RLikeDrmOps._
import DistributedEngine._
import org.apache.mahout.math.scalabindings._
import org.apache.log4j.Logger
@@ -102,6 +103,22 @@ object DistributedEngine {
case op@OpTimesLeftMatrix(a, b) =>
OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
+ // Add vertical row index concatenation for rbind() on DrmLike[Int]
fragments
+ case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) =>
+
+ // Make sure closure sees only local vals, not attributes. We need to
do these ugly casts
+ // around because compiler could not infer that K is the same as Int,
based on if() above.
+ val ma = safeToNonNegInt(a.nrow)
+ val bAdjusted = new OpMapBlock[Int, Int](
+ A = pass1(b.asInstanceOf[DrmLike[Int]]),
+ bmf = {
+ case (keys, block) => keys.map(_ + ma) -> block
+ },
+ identicallyPartitioned = false
+ )
+ val aAdjusted = a.asInstanceOf[DrmLike[Int]]
+ OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]]
+
// Stop at checkpoints
case cd: CheckpointedDrm[_] => action
@@ -152,6 +169,7 @@ object DistributedEngine {
case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
// case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+
// Rewrite A'x
case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git
a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index d7027f2..ae5da71 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -62,6 +62,8 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends
DrmLikeOps[K](drm) {
def t: DrmLike[Int] = OpAtAnyKey(A = drm)
def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that)
+
+ def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that)
}
class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
----------------------------------------------------------------------
diff --git
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
new file mode 100644
index 0000000..d45714b
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
+
+/** rbind() logical operator */
+case class OpRbind[K: ClassTag](
+ override var A: DrmLike[K],
+ override var B: DrmLike[K]
+ ) extends AbstractBinaryOp[K, K, K] {
+
+ assert(A.ncol == B.ncol, "arguments must have same number of columns")
+
+ override protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
+
+ /** R-like syntax for number of rows. */
+ def nrow: Long = A.nrow + B.nrow
+
+ /** R-like syntax for number of columns */
+ def ncol: Int = A.ncol
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git
a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
index 50beccf..3f37bb9 100644
---
a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
+++
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -488,5 +488,28 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite
with Matchers {
(drmB -: controlB).norm should be < 1e-10
}
+
+ test("C = A rbind B") {
+ val inCoreA = dense((1, 2), (3, 5))
+ val inCoreB = dense((7, 11), (13, 17))
+ val controlC = dense((1, 2), (3, 5), (7, 11), (13, 17))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+ val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+
+ (A.rbind(B) -: controlC).norm should be < 1e-10
+ }
+
+ test("C = A rbind B, with empty") {
+
+ val inCoreA = dense((1, 2), (3, 5))
+ val emptyB = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2)
+ val controlC = dense((1, 2), (3, 5), (0, 0), (0, 0))
+
+ val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+
+ (A.rbind(emptyB) -: controlC).norm should be < 1e-10
+ }
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index c37354f..36223fc 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -253,6 +253,7 @@ object SparkEngine extends DistributedEngine {
case op@OpAtx(a, x) => Ax.atx_with_broadcast(op,
tr2phys(a)(op.classTagA))
case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA),
tr2phys(b)(op.classTagB))
case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op,
tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpRbind(a, b) => RbindAB.rbindAB(op, tr2phys(a)(op.classTagA),
tr2phys(b)(op.classTagB))
case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op,
tr2phys(a)(op.classTagA), s)
case op@OpRowRange(a, _) => Slicing.rowRange(op,
tr2phys(a)(op.classTagA))
case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op,
tr2phys(a)(op.classTagA))
http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
new file mode 100644
index 0000000..5037d68
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.log4j.Logger
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpRbind
+
+/** Physical `rbind` */
+object RbindAB {
+
+ private val log = Logger.getLogger(RbindAB.getClass)
+
+ def rbindAB[K: ClassTag](op: OpRbind[K], srcA: DrmRddInput[K], srcB:
DrmRddInput[K]): DrmRddInput[K] = {
+
+ // If any of the inputs is blockified, use blockified inputs
+ if (srcA.isBlockified || srcB.isBlockified) {
+ val a = srcA.toBlockifiedDrmRdd()
+ val b = srcB.toBlockifiedDrmRdd()
+
+ // Union seems to be fine, it is indeed just do partition-level
unionization, no shuffles
+ new DrmRddInput(blockifiedSrc = Some(a ++ b))
+
+ } else {
+
+ // Otherwise, use row-wise inputs -- no reason to blockify here.
+ val a = srcA.toDrmRdd()
+ val b = srcB.toDrmRdd()
+
+ new DrmRddInput(rowWiseSrc = Some(op.ncol -> (a ++ b)))
+ }
+ }
+}