Some minor fixes, this closes apache/mahout#202
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e3c8db50 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e3c8db50 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e3c8db50 Branch: refs/heads/master Commit: e3c8db502b5e7cf926c2628f413bd5aaa69b2765 Parents: 202b94f Author: smarthi <[email protected]> Authored: Fri Mar 25 04:03:05 2016 -0400 Committer: smarthi <[email protected]> Committed: Fri Mar 25 04:03:05 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 2 - .../mahout/flinkbindings/blas/FlinkOpAtB.scala | 43 ++++++++------------ .../mahout/flinkbindings/blas/FlinkOpAx.scala | 11 ++--- .../apache/mahout/flinkbindings/package.scala | 2 +- .../mahout/flinkbindings/DrmLikeOpsSuite.scala | 10 +---- .../mahout/flinkbindings/UseCasesSuite.scala | 16 ++++---- .../mahout/flinkbindings/blas/LATestSuite.scala | 21 ++++------ 7 files changed, 41 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 843a4a9..c355cae 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -370,8 +370,6 @@ object FlinkEngine extends DistributedEngine { createTypeInformation[Long].asInstanceOf[TypeInformation[K]] } else if (tag.runtimeClass.equals(classOf[String])) { createTypeInformation[String].asInstanceOf[TypeInformation[K]] -// } else if (tag.runtimeClass.equals(classOf[Any])) { -// createTypeInformation[Any].asInstanceOf[TypeInformation[K]] } else { throw new IllegalArgumentException(s"index type $tag is not supported") } http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index ac1e73a..0a2683c 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -20,25 +20,18 @@ package org.apache.mahout.flinkbindings.blas import java.lang.Iterable -import scala.collection.JavaConverters.asScalaBufferConverter -import scala.reflect.ClassTag - -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.scala.DataSet +import com.google.common.collect.Lists +import org.apache.flink.api.common.functions.{FlatMapFunction, GroupReduceFunction} +import org.apache.flink.api.scala._ import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.Vector +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} +import org.apache.mahout.math.{Matrix, Vector} import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical.OpAtB import org.apache.mahout.math.scalabindings.RLikeOps._ -import com.google.common.collect.Lists - -import org.apache.flink.api.scala._ +import scala.collection.JavaConverters.asScalaBufferConverter /** * Implementation is taken from Spark's AtB @@ -47,7 +40,6 @@ import org.apache.flink.api.scala._ object FlinkOpAtB { def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): FlinkDrm[Int] = { - val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]] val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]] val joined = rowsAt.join(rowsB).where(0).equalTo(0) @@ -75,20 +67,21 @@ object FlinkOpAtB { }) val res: BlockifiedDrmDataSet[Int] = - preProduct.groupBy(0).reduceGroup(new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { - def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = { - val it = Lists.newArrayList(values).asScala - val (idx, _) = it.head + preProduct.groupBy(0).reduceGroup( + new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] { + def reduce(values: Iterable[(Int, Matrix)], out: Collector[BlockifiedDrmTuple[Int]]): Unit = { + val it = Lists.newArrayList(values).asScala + val (idx, _) = it.head - val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 } - - val blockStart = idx * blockHeight - val keys = Array.tabulate(block.nrow)(blockStart + _) + val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 } - out.collect(keys -> block) - } - }) + val blockStart = idx * blockHeight + val keys = Array.tabulate(block.nrow)(blockStart + _) + out.collect(keys -> block) + } + } + ) new BlockifiedFlinkDrm[Int](res, ncol) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index 79f5fe8..ec20b6d 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -18,19 +18,16 @@ */ package org.apache.mahout.flinkbindings.blas -import java.util.List +import java.util import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} -import org.apache.mahout.math.{Matrix, Vector} import org.apache.mahout.math.drm.logical.OpAx import org.apache.mahout.math.scalabindings.RLikeOps._ - -import scala.reflect.ClassTag - -import org.apache.flink.api.scala._ +import org.apache.mahout.math.{Matrix, Vector} /** @@ -50,7 +47,7 @@ object FlinkOpAx { override def open(params: Configuration): Unit = { val runtime = this.getRuntimeContext - val dsX: List[Vector] = runtime.getBroadcastVariable("vector") + val dsX: util.List[Vector] = runtime.getBroadcastVariable("vector") x = dsX.get(0) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index f0dd620..10ce545 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -38,7 +38,7 @@ package object flinkbindings { type DrmDataSet[K] = DataSet[DrmTuple[K]] /** - * Blockifed DRM dataset (keys of original DRM are grouped into array corresponding to rows of Matrix + * Blockified DRM dataset (keys of original DRM are grouped into array corresponding to rows of Matrix * object value */ type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]] http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala index 725f31a..fe2277c 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala @@ -18,17 +18,11 @@ */ package org.apache.mahout.flinkbindings -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.drm._ import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.junit.runner.RunWith +import org.apache.mahout.math.scalabindings._ import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner -import org.slf4j.Logger -import org.slf4j.LoggerFactory class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala index 0a5f145..fa49114 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala @@ -18,18 +18,16 @@ */ package org.apache.mahout.flinkbindings -import scala.util.hashing.MurmurHash3 -import org.apache.mahout.math.Matrices -import org.apache.mahout.math.Vector -import org.apache.mahout.math.drm._ +import org.apache.mahout.math.{Matrices, Vector} import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.drm._ import org.apache.mahout.math.function.IntIntFunction -import org.junit.runner.RunWith -import org.slf4j.LoggerFactory +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner +import org.slf4j.LoggerFactory + +import scala.util.hashing.MurmurHash3 class UseCasesSuite extends FunSuite with DistributedFlinkSuite { http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala index 81ca737..95d0969 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala @@ -18,18 +18,15 @@ */ package org.apache.mahout.flinkbindings.blas -import org.scalatest.FunSuite -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import drm._ import org.apache.flink.api.scala._ import org.apache.mahout.flinkbindings._ -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.apache.mahout.math.drm.logical.OpAx import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.mahout.math.drm.logical._ +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.logical.{OpAx, _} +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import org.scalatest.FunSuite class LATestSuite extends FunSuite with DistributedFlinkSuite { @@ -115,7 +112,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val res = FlinkOpCBind.cbind(op, A, B) val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, - _ncol=(inCoreA.ncol + inCoreB.ncol)) + _ncol= inCoreA.ncol + inCoreB.ncol) val output = drm.collect val expected = dense((1, 2, 4, 4), (2, 3, 5, 5), (3, 4, 6, 7)) @@ -130,7 +127,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val res = FlinkOpCBind.cbindScalar(op, A, 1) val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, - _ncol=(inCoreA.ncol + 1)) + _ncol= inCoreA.ncol + 1) val output = drm.collect val expected = dense((1, 1, 2), (1, 2, 3), (1, 3, 4)) @@ -145,7 +142,7 @@ class LATestSuite extends FunSuite with DistributedFlinkSuite { val res = FlinkOpCBind.cbindScalar(op, A, 1) val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow, - _ncol=(inCoreA.ncol + 1)) + _ncol= inCoreA.ncol + 1) val output = drm.collect val expected = dense((1, 2, 1), (2, 3, 1), (3, 4, 1))
