Repository: mahout
Updated Branches:
  refs/heads/flink-binding 93ebed620 -> b692cbfcc


MAHOUT-1805: implement allreduceBlock in Flink closes apache/mahout#193


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b692cbfc
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b692cbfc
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b692cbfc

Branch: refs/heads/flink-binding
Commit: b692cbfcc56871c85ea1c483e8d78eed21b514b8
Parents: 93ebed6
Author: Andrew Palumbo <[email protected]>
Authored: Tue Mar 15 15:40:30 2016 -0400
Committer: Andrew Palumbo <[email protected]>
Committed: Tue Mar 15 15:40:30 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      | 20 +++++++-------------
 1 file changed, 7 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/b692cbfc/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 8db063b..616512e 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -360,20 +360,14 @@ object FlinkEngine extends DistributedEngine {
     new CheckpointedFlinkDrm[K](sample)
   }
 
-  /** Optional engine-specific all reduce tensor operation. */
-  def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: 
BlockReduceFunc): Matrix =
-    throw new UnsupportedOperationException("the operation allreduceBlock is 
not yet supported on Flink")
-
-//  private def generateTypeInformation[K]: TypeInformation[K] = {
-//    val tag = implicitly[K].asInstanceOf[ClassTag[K]]
-//    generateTypeInformationFromTag(tag)
-//  }
-  private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = {
-    val tag = implicitly[ClassTag[K]]
-
-    generateTypeInformationFromTag(tag)
-  }
+  /** Engine-specific all reduce tensor operation. */
+  def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: 
BlockReduceFunc): Matrix = {
+    implicit val kTag: ClassTag[K] = drm.keyClassTag
+    implicit val typeInformation = generateTypeInformation[K]
 
+    val res = drm.asBlockified.ds.map(par => bmf(par)).reduce(rf)
+  }
+  
   private def generateTypeInformationFromTag[K](tag: ClassTag[K]): 
TypeInformation[K] = {
     if (tag.runtimeClass.equals(classOf[Int])) {
       createTypeInformation[Int].asInstanceOf[TypeInformation[K]]

Reply via email to