MAHOUT-1702: Flink: AewScalar replaced with OpAewUnaryFunc
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f66477fc Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f66477fc Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f66477fc Branch: refs/heads/flink-binding Commit: f66477fc53581796035e1dfef06d57e12719f1c5 Parents: 8de8b79 Author: Alexey Grigorev <[email protected]> Authored: Wed Jun 24 14:56:08 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:41:54 2015 +0200 ---------------------------------------------------------------------- .../flinkbindings/blas/FlinkOpAewScalar.scala | 44 +++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/f66477fc/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala index a1e1ab1..bf388b9 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala @@ -18,21 +18,28 @@ */ package org.apache.mahout.flinkbindings.blas +import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import org.apache.mahout.math.drm.logical.OpAewScalar + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.math.Matrix +import org.apache.mahout.math.drm.logical.OpAewScalar +import org.apache.mahout.math.drm.logical.OpAewUnaryFunc import org.apache.mahout.math.scalabindings._ -import RLikeOps._ -import org.apache.flink.api.common.functions.MapFunction -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm +import org.apache.mahout.math.scalabindings.RLikeOps._ /** * Implementation is inspired by Spark-binding's OpAewScalar - * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) + * (see https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala) */ object FlinkOpAewScalar { + final val PROPERTY_AEWB_INPLACE = "mahout.math.AewB.inplace" + private def isInplace = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean + + @Deprecated def opScalarNoSideEffect[K: ClassTag](op: OpAewScalar[K], A: FlinkDrm[K], scalar: Double): FlinkDrm[K] = { val function = EWOpsCloning.strToFunction(op.op) @@ -45,8 +52,35 @@ object FlinkOpAewScalar { new BlockifiedFlinkDrm(res, op.ncol) } + def opUnaryFunction[K: ClassTag](op: OpAewUnaryFunc[K], A: FlinkDrm[K], f: (Double) => Double): FlinkDrm[K] = { + val inplace = isInplace + + val res = if (op.evalZeros) { + A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = { + val (keys, block) = tuple + val newBlock = if (inplace) block else block.cloned + newBlock := ((_, _, x) => f(x)) + (keys, newBlock) + } + }) + } else { + A.blockify.ds.map(new MapFunction[(Array[K], Matrix), (Array[K], Matrix)] { + def map(tuple: (Array[K], Matrix)): (Array[K], Matrix) = { + val (keys, block) = tuple + val newBlock = if (inplace) block else block.cloned + for (row <- newBlock; el <- row.nonZeroes) el := f(el.get) + (keys, newBlock) + } + }) + } + + new BlockifiedFlinkDrm(res, op.ncol) + } + } +@Deprecated object EWOpsCloning { type MatrixScalarFunc = (Matrix, Double) => Matrix
