Repository: mahout
Updated Branches:
  refs/heads/master 2715ed201 -> 846307d5b


MAHOUT-1596: implement rbind() operator

This closes apache/mahout#30

Conflicts:
        
math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/846307d5
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/846307d5
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/846307d5

Branch: refs/heads/master
Commit: 846307d5b03ef7df6d3c890f2a291b4c11581e80
Parents: 2715ed2
Author: Dmitriy Lyubimov <[email protected]>
Authored: Tue Jul 29 13:17:15 2014 -0700
Committer: Dmitriy Lyubimov <[email protected]>
Committed: Tue Jul 29 13:17:15 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  2 +
 .../mahout/math/drm/DistributedEngine.scala     | 18 +++++++
 .../apache/mahout/math/drm/RLikeDrmOps.scala    |  2 +
 .../mahout/math/drm/logical/OpRbind.scala       | 40 ++++++++++++++++
 .../mahout/math/drm/RLikeDrmOpsSuiteBase.scala  | 23 +++++++++
 .../mahout/sparkbindings/SparkEngine.scala      |  1 +
 .../mahout/sparkbindings/blas/RbindAB.scala     | 49 ++++++++++++++++++++
 7 files changed, 135 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 50909e2..538b12b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
+  MAHOUT-1596: implement rbind() operator (Anand Avati and dlyubimov)
+
   MAHOUT-1597: A + 1.0 (element-wise scala operation) gives wrong result if 
rdd is missing rows, Spark side (dlyubimov)
 
   MAHOUT-1595: MatrixVectorView - implement a proper iterateNonZero() (Anand 
Avati via dlyubimov)

http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index 03471fd..d89cc53 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -22,6 +22,7 @@ import logical._
 import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
+import RLikeDrmOps._
 import DistributedEngine._
 import org.apache.mahout.math.scalabindings._
 import org.apache.log4j.Logger
@@ -102,6 +103,22 @@ object DistributedEngine {
       case op@OpTimesLeftMatrix(a, b) =>
         OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
 
+      // Add vertical row index concatenation for rbind() on DrmLike[Int] 
fragments
+      case op@OpRbind(a, b) if (implicitly[ClassTag[K]] == ClassTag.Int) =>
+
+        // Make sure closure sees only local vals, not attributes. We need to 
do these ugly casts
+        // around because compiler could not infer that K is the same as Int, 
based on if() above.
+        val ma = safeToNonNegInt(a.nrow)
+        val bAdjusted = new OpMapBlock[Int, Int](
+          A = pass1(b.asInstanceOf[DrmLike[Int]]),
+          bmf = {
+            case (keys, block) => keys.map(_ + ma) -> block
+          },
+          identicallyPartitioned = false
+        )
+        val aAdjusted = a.asInstanceOf[DrmLike[Int]]
+        OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]]
+
       // Stop at checkpoints
       case cd: CheckpointedDrm[_] => action
 
@@ -152,6 +169,7 @@ object DistributedEngine {
       case OpAB(OpAt(a), b) => OpAtB(pass3(a), pass3(b))
       //      case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
       case OpAB(a, b) => OpABt(pass3(a), OpAt(pass3(b)))
+
       // Rewrite A'x
       case op@OpAx(op1@OpAt(a), x) => OpAtx(pass3(a)(op1.classTagA), x)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
index d7027f2..ae5da71 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
@@ -62,6 +62,8 @@ class RLikeDrmOps[K: ClassTag](drm: DrmLike[K]) extends 
DrmLikeOps[K](drm) {
   def t: DrmLike[Int] = OpAtAnyKey(A = drm)
 
   def cbind(that: DrmLike[K]) = OpCbind(A = this.drm, B = that)
+
+  def rbind(that: DrmLike[K]) = OpRbind(A = this.drm, B = that)
 }
 
 class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
new file mode 100644
index 0000000..d45714b
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.drm.logical
+
+import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.DrmLike
+import scala.util.Random
+
+/** rbind() logical operator */
+case class OpRbind[K: ClassTag](
+    override var A: DrmLike[K],
+    override var B: DrmLike[K]
+    ) extends AbstractBinaryOp[K, K, K] {
+
+  assert(A.ncol == B.ncol, "arguments must have same number of columns")
+
+  override protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow + B.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
 
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
index 50beccf..3f37bb9 100644
--- 
a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
+++ 
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -488,5 +488,28 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite 
with Matchers {
 
     (drmB -: controlB).norm should be < 1e-10
   }
+  
+  test("C = A rbind B") {
 
+    val inCoreA = dense((1, 2), (3, 5))
+    val inCoreB = dense((7, 11), (13, 17))
+    val controlC = dense((1, 2), (3, 5), (7, 11), (13, 17))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+    val B = drmParallelize(inCoreB, numPartitions = 2).checkpoint()
+    
+    (A.rbind(B) -: controlC).norm should be < 1e-10
+  }
+
+  test("C = A rbind B, with empty") {
+
+    val inCoreA = dense((1, 2), (3, 5))
+    val emptyB = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2)
+    val controlC = dense((1, 2), (3, 5), (0, 0), (0, 0))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2).checkpoint()
+
+    (A.rbind(emptyB) -: controlC).norm should be < 1e-10
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index c37354f..36223fc 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -253,6 +253,7 @@ object SparkEngine extends DistributedEngine {
       case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, 
tr2phys(a)(op.classTagA))
       case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), 
tr2phys(b)(op.classTagB))
       case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, 
tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpRbind(a, b) => RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), 
tr2phys(b)(op.classTagB))
       case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, 
tr2phys(a)(op.classTagA), s)
       case op@OpRowRange(a, _) => Slicing.rowRange(op, 
tr2phys(a)(op.classTagA))
       case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, 
tr2phys(a)(op.classTagA))

http://git-wip-us.apache.org/repos/asf/mahout/blob/846307d5/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
new file mode 100644
index 0000000..5037d68
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.log4j.Logger
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpRbind
+
+/** Physical `rbind` */
+object RbindAB {
+
+  private val log = Logger.getLogger(RbindAB.getClass)
+
+  def rbindAB[K: ClassTag](op: OpRbind[K], srcA: DrmRddInput[K], srcB: 
DrmRddInput[K]): DrmRddInput[K] = {
+
+    // If any of the inputs is blockified, use blockified inputs
+    if (srcA.isBlockified || srcB.isBlockified) {
+      val a = srcA.toBlockifiedDrmRdd()
+      val b = srcB.toBlockifiedDrmRdd()
+
+      // Union seems to be fine, it is indeed just do partition-level 
unionization, no shuffles
+      new DrmRddInput(blockifiedSrc = Some(a ++ b))
+
+    } else {
+
+      // Otherwise, use row-wise inputs -- no reason to blockify here.
+      val a = srcA.toDrmRdd()
+      val b = srcB.toDrmRdd()
+
+      new DrmRddInput(rowWiseSrc = Some(op.ncol -> (a ++ b)))
+    }
+  }
+}

Reply via email to