MAHOUT-1702: Flink: unary functions
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/2a4e9690 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/2a4e9690 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/2a4e9690 Branch: refs/heads/flink-binding Commit: 2a4e9690c82fa341741f52fa7ccb64cab8ea66df Parents: f26245b Author: Alexey Grigorev <[email protected]> Authored: Mon Aug 24 14:31:14 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:44:21 2015 +0200 ---------------------------------------------------------------------- .../org/apache/mahout/flinkbindings/FlinkEngine.scala | 10 +++++++--- .../mahout/flinkbindings/blas/FlinkOpAewScalar.scala | 6 ++++-- 2 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/2a4e9690/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index c7bea7b..a21591d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -79,6 +79,7 @@ import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.flinkbindings.blas.FlinkOpAtA import org.apache.mahout.math.drm.logical.OpCbindScalar +import org.apache.mahout.math.drm.logical.OpAewUnaryFuncFusion object FlinkEngine extends DistributedEngine { @@ -168,8 +169,10 @@ object FlinkEngine extends DistributedEngine { case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA)) case op @ OpTimesRightMatrix(a, b) => FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b) - case op @ OpAewUnaryFunc(a, f, _) => - FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA), f) + case op @ OpAewUnaryFunc(a, _, _) => + FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA)) + case op @ OpAewUnaryFuncFusion(a, _) => + FlinkOpAewScalar.opUnaryFunction(op, flinkTranslate(a)(op.classTagA)) case op @ OpAewScalar(a, scalar, _) => FlinkOpAewScalar.opScalarNoSideEffect(op, flinkTranslate(a)(op.classTagA), scalar) case op @ OpAewB(a, b, _) => @@ -282,6 +285,7 @@ object FlinkEngine extends DistributedEngine { def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ??? /** Optional engine-specific all reduce tensor operation. */ - def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = ??? + def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = + throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink") } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/2a4e9690/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala index bf388b9..a97f8c8 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala @@ -20,7 +20,6 @@ package org.apache.mahout.flinkbindings.blas import scala.collection.JavaConversions._ import scala.reflect.ClassTag - import org.apache.flink.api.common.functions.MapFunction import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm import org.apache.mahout.flinkbindings.drm.FlinkDrm @@ -29,6 +28,8 @@ import org.apache.mahout.math.drm.logical.OpAewScalar import org.apache.mahout.math.drm.logical.OpAewUnaryFunc import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.drm.logical.AbstractUnaryOp +import org.apache.mahout.math.drm.logical.TEwFunc /** * Implementation is inspired by Spark-binding's OpAewScalar @@ -52,7 +53,8 @@ object FlinkOpAewScalar { new BlockifiedFlinkDrm(res, op.ncol) } - def opUnaryFunction[K: ClassTag](op: OpAewUnaryFunc[K], A: FlinkDrm[K], f: (Double) => Double): FlinkDrm[K] = { + def opUnaryFunction[K: ClassTag](op: AbstractUnaryOp[K, K] with TEwFunc, A: FlinkDrm[K]): FlinkDrm[K] = { + val f = op.f val inplace = isInplace val res = if (op.evalZeros) {
