MAHOUT-1702: Flink: unary functions

Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2a4e9690
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2a4e9690
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2a4e9690

Branch: refs/heads/flink-binding
Commit: 2a4e9690c82fa341741f52fa7ccb64cab8ea66df
Parents: f26245b
Author: Alexey Grigorev <[email protected]>
Authored: Mon Aug 24 14:31:14 2015 +0200
Committer: Alexey Grigorev <[email protected]>
Committed: Fri Sep 25 17:44:21 2015 +0200

----------------------------------------------------------------------
 .../org/apache/mahout/flinkbindings/FlinkEngine.scala     | 10 +++++++---
 .../mahout/flinkbindings/blas/FlinkOpAewScalar.scala      |  6 ++++--
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/2a4e9690/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index c7bea7b..a21591d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -79,6 +79,7 @@ import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
 import org.apache.mahout.flinkbindings.blas.FlinkOpAtA
 import org.apache.mahout.math.drm.logical.OpCbindScalar
+import org.apache.mahout.math.drm.logical.OpAewUnaryFuncFusion
 
 object FlinkEngine extends DistributedEngine {
 
@@ -168,8 +169,10 @@ object FlinkEngine extends DistributedEngine {
     case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA))
     case op @ OpTimesRightMatrix(a, b) => 
       FlinkOpTimesRightMatrix.drmTimesInCore(op, 
flinkTranslate(a)(op.classTagA), b)
-    case op @ OpAewUnaryFunc(a, f, _) =>
-      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA), f)
+    case op @ OpAewUnaryFunc(a, _, _) =>
+      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
+    case op @ OpAewUnaryFuncFusion(a, _) => 
+      FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA))
     case op @ OpAewScalar(a, scalar, _) => 
       FlinkOpAewScalar.opScalarNoSideEffect(op, 
flinkTranslate(a)(op.classTagA), scalar)
     case op @ OpAewB(a, b, _) =>
@@ -282,6 +285,7 @@ object FlinkEngine extends DistributedEngine {
   def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, 
replacement: Boolean = false): Matrix = ???
 
   /** Optional engine-specific all reduce tensor operation. */
-  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: 
BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = ???
+  def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: 
BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = 
+    throw new UnsupportedOperationException("the operation allreduceBlock is 
not yet supported on Flink")
  
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/2a4e9690/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
index bf388b9..a97f8c8 100644
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala
@@ -20,7 +20,6 @@ package org.apache.mahout.flinkbindings.blas
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
-
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
 import org.apache.mahout.flinkbindings.drm.FlinkDrm
@@ -29,6 +28,8 @@ import org.apache.mahout.math.drm.logical.OpAewScalar
 import org.apache.mahout.math.drm.logical.OpAewUnaryFunc
 import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.drm.logical.AbstractUnaryOp
+import org.apache.mahout.math.drm.logical.TEwFunc
 
 /**
  * Implementation is inspired by Spark-binding's OpAewScalar
@@ -52,7 +53,8 @@ object FlinkOpAewScalar {
     new BlockifiedFlinkDrm(res, op.ncol)
   }
 
-  def opUnaryFunction[K: ClassTag](op: OpAewUnaryFunc[K], A: FlinkDrm[K], f: 
(Double) => Double): FlinkDrm[K] = {
+  def opUnaryFunction[K: ClassTag](op: AbstractUnaryOp[K, K] with TEwFunc, A: 
FlinkDrm[K]): FlinkDrm[K] = {
+    val f = op.f
     val inplace = isInplace
 
     val res = if (op.evalZeros) {

Reply via email to