Some minor fixes, this closes apache/mahout#202

Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e3c8db50
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e3c8db50
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e3c8db50

Branch: refs/heads/master
Commit: e3c8db502b5e7cf926c2628f413bd5aaa69b2765
Parents: 202b94f
Author: smarthi <[email protected]>
Authored: Fri Mar 25 04:03:05 2016 -0400
Committer: smarthi <[email protected]>
Committed: Fri Mar 25 04:03:05 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/FlinkEngine.scala      |  2 -
 .../mahout/flinkbindings/blas/FlinkOpAtB.scala  | 43 ++++++++------------
 .../mahout/flinkbindings/blas/FlinkOpAx.scala   | 11 ++---
 .../apache/mahout/flinkbindings/package.scala   |  2 +-
 .../mahout/flinkbindings/DrmLikeOpsSuite.scala  | 10 +----
 .../mahout/flinkbindings/UseCasesSuite.scala    | 16 ++++----
 .../mahout/flinkbindings/blas/LATestSuite.scala | 21 ++++------
 7 files changed, 41 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
index 843a4a9..c355cae 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala
@@ -370,8 +370,6 @@ object FlinkEngine extends DistributedEngine {
       createTypeInformation[Long].asInstanceOf[TypeInformation[K]]
     } else if (tag.runtimeClass.equals(classOf[String])) {
       createTypeInformation[String].asInstanceOf[TypeInformation[K]]
-//    } else if (tag.runtimeClass.equals(classOf[Any])) {
-//       createTypeInformation[Any].asInstanceOf[TypeInformation[K]]
     } else {
       throw new IllegalArgumentException(s"index type $tag is not supported")
     }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
index ac1e73a..0a2683c 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala
@@ -20,25 +20,18 @@ package org.apache.mahout.flinkbindings.blas
 
 import java.lang.Iterable
 
-import scala.collection.JavaConverters.asScalaBufferConverter
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.scala.DataSet
+import com.google.common.collect.Lists
+import org.apache.flink.api.common.functions.{FlatMapFunction, 
GroupReduceFunction}
+import org.apache.flink.api.scala._
 import org.apache.flink.util.Collector
 import org.apache.mahout.flinkbindings._
-import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.FlinkDrm
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.Vector
+import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
+import org.apache.mahout.math.{Matrix, Vector}
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.logical.OpAtB
 import org.apache.mahout.math.scalabindings.RLikeOps._
 
-import com.google.common.collect.Lists
-
-import org.apache.flink.api.scala._
+import scala.collection.JavaConverters.asScalaBufferConverter
 
 /**
  * Implementation is taken from Spark's AtB
@@ -47,7 +40,6 @@ import org.apache.flink.api.scala._
 object FlinkOpAtB {
 
   def notZippable[A](op: OpAtB[A], At: FlinkDrm[A], B: FlinkDrm[A]): 
FlinkDrm[Int] = {
-
     val rowsAt = At.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
     val rowsB = B.asRowWise.ds.asInstanceOf[DrmDataSet[A]]
     val joined = rowsAt.join(rowsB).where(0).equalTo(0)
@@ -75,20 +67,21 @@ object FlinkOpAtB {
     })
 
     val res: BlockifiedDrmDataSet[Int] = 
-      preProduct.groupBy(0).reduceGroup(new GroupReduceFunction[(Int, Matrix), 
BlockifiedDrmTuple[Int]] {
-      def reduce(values: Iterable[(Int, Matrix)], out: 
Collector[BlockifiedDrmTuple[Int]]): Unit = {
-        val it = Lists.newArrayList(values).asScala
-        val (idx, _) = it.head
+      preProduct.groupBy(0).reduceGroup(
+        new GroupReduceFunction[(Int, Matrix), BlockifiedDrmTuple[Int]] {
+          def reduce(values: Iterable[(Int, Matrix)], out: 
Collector[BlockifiedDrmTuple[Int]]): Unit = {
+            val it = Lists.newArrayList(values).asScala
+            val (idx, _) = it.head
 
-        val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 }
-        
-        val blockStart = idx * blockHeight
-        val keys = Array.tabulate(block.nrow)(blockStart + _)
+            val block = it.map { t => t._2 }.reduce { (m1, m2) => m1 + m2 }
 
-        out.collect(keys -> block)
-      }
-    })
+            val blockStart = idx * blockHeight
+            val keys = Array.tabulate(block.nrow)(blockStart + _)
 
+            out.collect(keys -> block)
+          }
+        }
+      )
 
     new BlockifiedFlinkDrm[Int](res, ncol)
   }

http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
index 79f5fe8..ec20b6d 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala
@@ -18,19 +18,16 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import java.util.List
+import java.util
 
 import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm}
-import org.apache.mahout.math.{Matrix, Vector}
 import org.apache.mahout.math.drm.logical.OpAx
 import org.apache.mahout.math.scalabindings.RLikeOps._
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala._
+import org.apache.mahout.math.{Matrix, Vector}
 
 
 /**
@@ -50,7 +47,7 @@ object FlinkOpAx {
 
       override def open(params: Configuration): Unit = {
         val runtime = this.getRuntimeContext
-        val dsX: List[Vector] = runtime.getBroadcastVariable("vector")
+        val dsX: util.List[Vector] = runtime.getBroadcastVariable("vector")
         x = dsX.get(0)
       }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index f0dd620..10ce545 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -38,7 +38,7 @@ package object flinkbindings {
   type DrmDataSet[K] = DataSet[DrmTuple[K]]
 
   /**
-   * Blockifed DRM dataset (keys of original DRM are grouped into array 
corresponding to rows of Matrix
+   * Blockified DRM dataset (keys of original DRM are grouped into array 
corresponding to rows of Matrix
    * object value
    */
   type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]]

http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
index 725f31a..fe2277c 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
@@ -18,17 +18,11 @@
  */
 package org.apache.mahout.flinkbindings
 
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math.drm._
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.junit.runner.RunWith
+import org.apache.mahout.math.scalabindings._
 import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
 
 
 class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite {

http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
index 0a5f145..fa49114 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
@@ -18,18 +18,16 @@
  */
 package org.apache.mahout.flinkbindings
 
-import scala.util.hashing.MurmurHash3
-import org.apache.mahout.math.Matrices
-import org.apache.mahout.math.Vector
-import org.apache.mahout.math.drm._
+import org.apache.mahout.math.{Matrices, Vector}
 import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.drm._
 import org.apache.mahout.math.function.IntIntFunction
-import org.junit.runner.RunWith
-import org.slf4j.LoggerFactory
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
 import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+
+import scala.util.hashing.MurmurHash3
 
 class UseCasesSuite extends FunSuite with DistributedFlinkSuite {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/e3c8db50/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
index 81ca737..95d0969 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
@@ -18,18 +18,15 @@
  */
 package org.apache.mahout.flinkbindings.blas
 
-import org.scalatest.FunSuite
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import drm._
 import org.apache.flink.api.scala._
 import org.apache.mahout.flinkbindings._
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.apache.mahout.math.drm.logical.OpAx
 import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical.{OpAx, _}
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+import org.scalatest.FunSuite
 
 class LATestSuite extends FunSuite with DistributedFlinkSuite {
 
@@ -115,7 +112,7 @@ class LATestSuite extends FunSuite with 
DistributedFlinkSuite {
     val res = FlinkOpCBind.cbind(op, A, B)
 
     val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
-        _ncol=(inCoreA.ncol + inCoreB.ncol))
+        _ncol= inCoreA.ncol + inCoreB.ncol)
     val output = drm.collect
 
     val expected = dense((1, 2, 4, 4), (2, 3, 5, 5), (3, 4, 6, 7))
@@ -130,7 +127,7 @@ class LATestSuite extends FunSuite with 
DistributedFlinkSuite {
     val res = FlinkOpCBind.cbindScalar(op, A, 1)
 
     val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
-        _ncol=(inCoreA.ncol + 1))
+        _ncol= inCoreA.ncol + 1)
     val output = drm.collect
 
     val expected = dense((1, 1, 2), (1, 2, 3), (1, 3, 4))
@@ -145,7 +142,7 @@ class LATestSuite extends FunSuite with 
DistributedFlinkSuite {
     val res = FlinkOpCBind.cbindScalar(op, A, 1)
 
     val drm = new CheckpointedFlinkDrm(res.asRowWise.ds, _nrow=inCoreA.nrow,
-        _ncol=(inCoreA.ncol + 1))
+        _ncol= inCoreA.ncol + 1)
     val output = drm.collect
 
     val expected = dense((1, 2, 1), (2, 3, 1), (3, 4, 1))

Reply via email to