Repository: mahout Updated Branches: refs/heads/flink-binding 1d9b6322e -> e943b0a0d
Removed unused imports Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e943b0a0 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e943b0a0 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e943b0a0 Branch: refs/heads/flink-binding Commit: e943b0a0df28b20cf93aba8778a02eb68fead6c4 Parents: 1d9b632 Author: smarthi <[email protected]> Authored: Sat Oct 24 23:51:15 2015 -0400 Committer: smarthi <[email protected]> Committed: Sat Oct 24 23:51:15 2015 -0400 ---------------------------------------------------------------------- .../org/apache/mahout/flinkbindings/FlinkEngine.scala | 14 ++++---------- .../apache/mahout/flinkbindings/drm/FlinkDrm.scala | 5 ++--- 2 files changed, 6 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/e943b0a0/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 9820b86..5915c0a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -28,8 +28,6 @@ import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred.FileInputFormat -import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.SequenceFileInputFormat import org.apache.mahout.flinkbindings.blas._ import org.apache.mahout.flinkbindings.drm._ @@ -101,20 +99,19 @@ object FlinkEngine extends DistributedEngine { private def flinkTranslate[K: ClassTag](oper: DrmLike[K]): FlinkDrm[K] = oper match { case op @ OpAx(a, x) => FlinkOpAx.blockifiedBroadcastAx(op, flinkTranslate(a)(op.classTagA)) case op @ OpAt(a) => FlinkOpAt.sparseTrick(op, flinkTranslate(a)(op.classTagA)) - case op @ OpAtx(a, x) => { + case op @ OpAtx(a, x) => // express Atx as (A.t) %*% x - // TODO: create specific implementation of Atx, see MAHOUT-1749 + // TODO: create specific implementation of Atx, see MAHOUT-1749 val opAt = OpAt(a) val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a)(op.classTagA)) val atCast = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) val opAx = OpAx(atCast, x) FlinkOpAx.blockifiedBroadcastAx(opAx, flinkTranslate(atCast)(op.classTagA)) - } case op @ OpAtB(a, b) => FlinkOpAtB.notZippable(op, flinkTranslate(a)(op.classTagA), flinkTranslate(b)(op.classTagA)) - case op @ OpABt(a, b) => { + case op @ OpABt(a, b) => // express ABt via AtB: let C=At and D=Bt, and calculate CtD - // TODO: create specific implementation of ABt, see MAHOUT-1750 + // TODO: create specific implementation of ABt, see MAHOUT-1750 val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow=opAt.nrow, _ncol=opAt.ncol) @@ -125,7 +122,6 @@ object FlinkEngine extends DistributedEngine { FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)) .asInstanceOf[FlinkDrm[K]] - } case op @ OpAtA(a) => FlinkOpAtA.at_a(op, flinkTranslate(a)(op.classTagA)) case op @ OpTimesRightMatrix(a, b) => FlinkOpTimesRightMatrix.drmTimesInCore(op, flinkTranslate(a)(op.classTagA), b) @@ -170,8 +166,6 @@ object FlinkEngine extends DistributedEngine { /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ override def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = { - val n = drm.ncol - val result = drm.asBlockified.ds.map(new MapFunction[(Array[K], Matrix), Vector] { def map(tuple: (Array[K], Matrix)): Vector = { val (_, block) = tuple http://git-wip-us.apache.org/repos/asf/mahout/blob/e943b0a0/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala index d00a335..4a16724 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala @@ -56,7 +56,7 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl def isBlockified = false - def asBlockified(): BlockifiedFlinkDrm[K] = { + def asBlockified : BlockifiedFlinkDrm[K] = { val ncolLocal = ncol val classTag = implicitly[ClassTag[K]] @@ -100,9 +100,8 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] { def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match { case (keys, block) => keys.view.zipWithIndex.foreach { - case (key, idx) => { + case (key, idx) => out.collect((key, block(idx, ::))) - } } } })
