Repository: mahout Updated Branches: refs/heads/flink-binding 7b862781c -> 2e8790d5c
MAHOUT-1815: dsqDist(X,Y) and dsqDist(X) failing in flink tests. closes apache/mahout#197 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2e8790d5 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2e8790d5 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2e8790d5 Branch: refs/heads/flink-binding Commit: 2e8790d5c6e0f337abe55906e052d7236f046207 Parents: 7b86278 Author: Andrew Palumbo <[email protected]> Authored: Thu Mar 17 18:33:10 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Thu Mar 17 18:33:10 2016 -0400 ---------------------------------------------------------------------- .../scala/org/apache/mahout/flinkbindings/FlinkEngine.scala | 4 ++-- .../scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/2e8790d5/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 958b6cf..f1e06d0 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -152,10 +152,10 @@ object FlinkEngine extends DistributedEngine { // express ABt via AtB: let C=At and D=Bt, and calculate CtD // TODO: create specific implementation of ABt, see MAHOUT-1750 val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) + val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) + val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol) FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]] case op@OpAtA(a) if op.keyClassTag == ClassTag.Int â FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] http://git-wip-us.apache.org/repos/asf/mahout/blob/2e8790d5/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index 6a081ba..ac1e73a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -81,12 +81,15 @@ object FlinkOpAtB { val (idx, _) = it.head val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 } + + val blockStart = idx * blockHeight + val keys = Array.tabulate(block.nrow)(blockStart + _) - val keys = idx.until(block.nrow).toArray[Int] out.collect(keys -> block) } }) + new BlockifiedFlinkDrm[Int](res, ncol) }
