Repository: mahout Updated Branches: refs/heads/flink-binding 93ebed620 -> b692cbfcc
MAHOUT-1805: implement allreduceBlock in Flink closes apache/mahout#193 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/b692cbfc Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/b692cbfc Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/b692cbfc Branch: refs/heads/flink-binding Commit: b692cbfcc56871c85ea1c483e8d78eed21b514b8 Parents: 93ebed6 Author: Andrew Palumbo <[email protected]> Authored: Tue Mar 15 15:40:30 2016 -0400 Committer: Andrew Palumbo <[email protected]> Committed: Tue Mar 15 15:40:30 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/b692cbfc/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 8db063b..616512e 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -360,20 +360,14 @@ object FlinkEngine extends DistributedEngine { new CheckpointedFlinkDrm[K](sample) } - /** Optional engine-specific all reduce tensor operation. */ - def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = - throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink") - -// private def generateTypeInformation[K]: TypeInformation[K] = { -// val tag = implicitly[K].asInstanceOf[ClassTag[K]] -// generateTypeInformationFromTag(tag) -// } - private def generateTypeInformation[K: ClassTag]: TypeInformation[K] = { - val tag = implicitly[ClassTag[K]] - - generateTypeInformationFromTag(tag) - } + /** Engine-specific all reduce tensor operation. */ + def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = { + implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val typeInformation = generateTypeInformation[K] + val res = drm.asBlockified.ds.map(par => bmf(par)).reduce(rf) + } + private def generateTypeInformationFromTag[K](tag: ClassTag[K]): TypeInformation[K] = { if (tag.runtimeClass.equals(classOf[Int])) { createTypeInformation[Int].asInstanceOf[TypeInformation[K]]
