MAHOUT-1570: Flink: numNonZeroElementsPerColumn
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/35426a96 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/35426a96 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/35426a96 Branch: refs/heads/flink-binding Commit: 35426a96b98bb74538466318c5347d2f90415e97 Parents: d13f488 Author: Alexey Grigorev <[email protected]> Authored: Tue Aug 25 17:31:39 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:46:20 2015 +0200 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 25 +++++++++++++++++--- .../mahout/flinkbindings/DrmLikeOpsSuite.scala | 8 +++++++ 2 files changed, 30 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/35426a96/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 35c6b76..ab35e78 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -19,10 +19,9 @@ package org.apache.mahout.flinkbindings import java.util.Collection - import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ import scala.reflect.ClassTag - import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.tuple.Tuple2 @@ -171,7 +170,27 @@ object FlinkEngine extends DistributedEngine { } /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ - override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = ??? + override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { + val n = drm.ncol + + val result = drm.blockify.ds.map(new MapFunction[(Array[K], Matrix), Vector] { + def map(tuple: (Array[K], Matrix)): Vector = { + val (_, block) = tuple + val acc = block(0, ::).like() + + block.foreach { v => + v.nonZeroes().foreach { el => acc(el.index()) = acc(el.index()) + 1 } + } + + acc + } + }).reduce(new ReduceFunction[Vector] { + def reduce(v1: Vector, v2: Vector) = v1 + v2 + }) + + val list = result.collect.asScala.toList + list.head + } /** * returns a vector that contains a column-wise mean from DRM http://git-wip-us.apache.org/repos/asf/mahout/blob/35426a96/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala index 4c75afa..83d7f43 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala @@ -62,6 +62,14 @@ class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6 } + test("numNonZeroElementsPerColumn") { + val A = dense((0, 2), (3, 0), (0, -30)) + val drmA = drmParallelize(A, numPartitions = 2) + + drmA.numNonZeroElementsPerColumn() should equal(A.numNonZeroElementsPerColumn()) + } + + test("drmParallelizeEmpty") { val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2) val expected = dense((0, 0), (0, 0))
