MAHOUT-1570: Flink: numNonZeroElementsPerColumn

Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/35426a96
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/35426a96
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/35426a96

Branch: refs/heads/flink-binding
Commit: 35426a96b98bb74538466318c5347d2f90415e97
Parents: d13f488
Author: Alexey Grigorev <[email protected]>
Authored: Tue Aug 25 17:31:39 2015 +0200
Committer: Alexey Grigorev <[email protected]>
Committed: Fri Sep 25 17:46:20 2015 +0200

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 25 +++++++++++++++++---
 .../mahout/flinkbindings/DrmLikeOpsSuite.scala  |  8 +++++++
 2 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/35426a96/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 35c6b76..ab35e78 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -19,10 +19,9 @@
 package org.apache.mahout.flinkbindings
 
 import java.util.Collection
-
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.tuple.Tuple2
@@ -171,7 +170,27 @@ object FlinkEngine extends DistributedEngine {
   }
 
   /** Engine-specific numNonZeroElementsPerColumn implementation based on a 
checkpoint. */
-  override def numNonZeroElementsPerColumn[K: ClassTag](drm: 
CheckpointedDrm[K]): Vector = ???
+  override def numNonZeroElementsPerColumn[K: ClassTag](drm: 
CheckpointedDrm[K]): Vector = {
+    val n = drm.ncol
+
+    val result = drm.blockify.ds.map(new MapFunction[(Array[K], Matrix), 
Vector] {
+      def map(tuple: (Array[K], Matrix)): Vector = {
+        val (_, block) = tuple
+        val acc = block(0, ::).like()
+
+        block.foreach { v =>
+          v.nonZeroes().foreach { el => acc(el.index()) = acc(el.index()) + 1 }
+        }
+
+        acc
+      }
+    }).reduce(new ReduceFunction[Vector] {
+      def reduce(v1: Vector, v2: Vector) = v1 + v2
+    })
+
+    val list = result.collect.asScala.toList
+    list.head
+  }
 
   /** 
    * returns a vector that contains a column-wise mean from DRM 

http://git-wip-us.apache.org/repos/asf/mahout/blob/35426a96/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
index 4c75afa..83d7f43 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
@@ -62,6 +62,14 @@ class DrmLikeOpsSuite extends FunSuite with 
DistributedFlinkSuite {
     (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6
   }
 
+  test("numNonZeroElementsPerColumn") {
+    val A = dense((0, 2), (3, 0), (0, -30))
+    val drmA = drmParallelize(A, numPartitions = 2)
+
+    drmA.numNonZeroElementsPerColumn() should 
equal(A.numNonZeroElementsPerColumn())
+  }
+
+
   test("drmParallelizeEmpty") {
     val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2)
     val expected = dense((0, 0), (0, 0))

Reply via email to