MAHOUT-1701: Flink: AtB bug fixed
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/fcc6cf17 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/fcc6cf17 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/fcc6cf17 Branch: refs/heads/flink-binding Commit: fcc6cf17a7820de9d0edd6bbca023086c84913ee Parents: 2a4e969 Author: Alexey Grigorev <[email protected]> Authored: Tue Aug 25 14:07:29 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:44:49 2015 +0200 ---------------------------------------------------------------------- .../apache/mahout/flinkbindings/blas/FlinkOpAtB.scala | 2 +- .../apache/mahout/flinkbindings/RLikeOpsSuite.scala | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/fcc6cf17/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index f02cd84..c54e6de 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -70,7 +70,7 @@ object FlinkOpAtB { 0.until(blockCount) map { blockKey => val blockStart = blockKey * blockHeight - val blockEnd = Math.min(ncol, blockStart + blockHeight) + val blockEnd = Math.min(nrow.toInt, blockStart + blockHeight) // Create block by cross product of proper slice of aRow and qRow val outer = avec(blockStart until blockEnd) cross bvec http://git-wip-us.apache.org/repos/asf/mahout/blob/fcc6cf17/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala index 77ecaf4..800218b 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala @@ -115,6 +115,20 @@ class RLikeOpsSuite extends FunSuite with DistributedFlinkSuite { assert((res.collect - expected).norm < 1e-6) } + test("ABt test") { + val mxX = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8)) + val mxY = dense((1, 2), (2, 3), (3, 4), (5, 6), (7, 8), + (1, 2), (2, 3), (3, 4), (5, 6), (7, 8)) + + val drmX = drmParallelize(mxX, 3) + val drmY = drmParallelize(mxY, 4) + + val XYt = (drmX %*% drmY.t).collect + val control = mxX %*% mxY.t + (XYt - control).norm should be < 1e-7 + } + + test("A * scalar") { val inCoreA = dense((1, 2), (2, 3), (3, 4)) val A = drmParallelize(m = inCoreA, numPartitions = 2)
