Repository: mahout
Updated Branches:
  refs/heads/flink-binding 92a2f6c8f -> 072289a46


http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
new file mode 100644
index 0000000..50d3bc6
--- /dev/null
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings.io
+
+import org.apache.hadoop.io.{ Writable, SequenceFile }
+import org.apache.hadoop.fs.{ FileSystem, Path }
+import org.apache.hadoop.conf.Configuration
+import collection._
+import JavaConversions._
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout 
dependencies. May not work
+ * with Hadoop 2.0
+ *
+ * Copied from /spark/src/main/scala/org/apache/mahout/common
+ */
+object Hadoop2HDFSUtil extends HDFSUtil {
+
+  /**
+   * Read the header of a sequence file and determine the Key and Value type
+   * @param path
+   * @return
+   */
+  def readDrmHeader(path: String): DrmMetadata = {
+    val dfsPath = new Path(path)
+    val conf = new Configuration()
+    val fs = dfsPath.getFileSystem(conf)
+
+    fs.setConf(conf)
+
+    val partFilePath: Path = fs.listStatus(dfsPath)
+
+      // Filter out anything starting with .
+      .filter { s =>
+        !s.getPath.getName.startsWith("\\.") && 
!s.getPath.getName.startsWith("_") && !s.isDir
+      }
+
+      // Take path
+      .map(_.getPath)
+
+      // Take only one, if any
+      .headOption
+
+      // Require there's at least one partition file found.
+      .getOrElse {
+        throw new IllegalArgumentException(s"No partition files found in 
${dfsPath.toString}.")
+      }
+
+    // flink is retiring hadoop 1
+     val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+
+    // hadoop 2 reader
+//    val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf,
+//      SequenceFile.Reader.file(partFilePath));
+    try {
+      new DrmMetadata(
+        keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+        valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable]))
+    } finally {
+      reader.close()
+    }
+
+  }
+
+  /**
+   * Delete a path from the filesystem
+   * @param path
+   */
+  def delete(path: String) {
+    val dfsPath = new Path(path)
+    val fs = dfsPath.getFileSystem(new Configuration())
+
+    if (fs.exists(dfsPath)) {
+      fs.delete(dfsPath, true)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 6b8f2ae..b083752 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -18,16 +18,15 @@
  */
 package org.apache.mahout
 
-import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, 
CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm}
-import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, 
VectorWritable}
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
+import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrm, 
CheckpointedFlinkDrmOps, FlinkDrm, RowsFlinkDrm}
 import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, 
DistributedContext, DrmTuple, _}
+import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, 
VectorWritable}
 import org.slf4j.LoggerFactory
 
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.utils._
-
 import scala.Array._
 import scala.reflect.ClassTag
 
@@ -44,7 +43,6 @@ package object flinkbindings {
    */
   type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]]
 
-  
   implicit def wrapMahoutContext(context: DistributedContext): 
FlinkDistributedContext = {
     assert(context.isInstanceOf[FlinkDistributedContext], "it must be 
FlinkDistributedContext")
     context.asInstanceOf[FlinkDistributedContext]
@@ -62,7 +60,7 @@ package object flinkbindings {
     drm.asInstanceOf[CheckpointedFlinkDrm[K]]
   }
 
-  implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): 
FlinkDrm[K] = {
+  implicit def checkpointedDrmToFlinkDrm[K: TypeInformation: ClassTag](cp: 
CheckpointedDrm[K]): FlinkDrm[K] = {
     val flinkDrm = castCheckpointedDrm(cp)
     new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol)
   }
@@ -83,10 +81,8 @@ package object flinkbindings {
   def readCsv(file: String, delim: String = ",", comment: String = "#")
              (implicit dc: DistributedContext): CheckpointedDrm[Long] = {
     val vectors = dc.env.readTextFile(file)
-      .filter(new FilterFunction[String] {
-        def filter(in: String): Boolean = {
-          !in.startsWith(comment)
-        }
+      .filter((in: String) => {
+        !in.startsWith(comment)
       })
       .map(new MapFunction[String, Vector] {
         def map(in: String): Vector = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
index 6fb71ea..41c7a6a 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala
@@ -31,6 +31,8 @@ trait DistributedFlinkSuite extends DistributedMahoutSuite { 
this: Suite =>
 
   def initContext() {
     env = ExecutionEnvironment.getExecutionEnvironment
+    // set this higher so that tests like dsqDist(X,Y) have enough available 
slots to pass on a single machine.
+    env.setParallelism(10)
     mahoutCtx = wrapContext(env)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
index 83d7f43..725f31a 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala
@@ -31,7 +31,6 @@ import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
 
-@RunWith(classOf[JUnitRunner])
 class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
 
   test("norm") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
new file mode 100644
index 0000000..b834912
--- /dev/null
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.mahout.flinkbindings
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+import org.apache.mahout.common.RandomUtils
+
+import scala.collection.immutable.List
+
+//import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
+import org.apache.hadoop.io.IntWritable
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, 
SequenceFileOutputFormat}
+import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import org.apache.mahout.math.drm._
+import RLikeDrmOps._
+import RLikeOps._
+import math._
+
+import org.apache.mahout.math.decompositions._
+import org.scalatest.{FunSuite, Matchers}
+
+
+import scala.reflect.ClassTag
+import org.apache.flink.api.scala._
+
+
+
+class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with 
Matchers {
+
+// // passing now
+//  test("Simple DataSet to IntWritable") {
+//    val path = TmpDir + "flinkOutput"
+//
+//    implicit val typeInfo = createTypeInformation[(Int,Int)]
+//    val ds = env.fromElements[(Int,Int)]((1,2),(3,4),(5,6),(7,8))
+//   // val job = new JobConf
+//
+//
+//    val writableDataset : DataSet[(IntWritable,IntWritable)] =
+//      ds.map( tuple =>
+//        (new IntWritable(tuple._1.asInstanceOf[Int]), new 
IntWritable(tuple._2.asInstanceOf[Int]))
+//    )
+//
+//    val job: Job = new Job()
+//
+//    job.setOutputKeyClass(classOf[IntWritable])
+//    job.setOutputValueClass(classOf[IntWritable])
+//
+//    // setup sink for IntWritable
+//    val sequenceFormat = new SequenceFileOutputFormat[IntWritable, 
IntWritable]
+//    val hadoopOutput  = new 
HadoopOutputFormat[IntWritable,IntWritable](sequenceFormat, job)
+//    FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path))
+//
+//    writableDataset.output(hadoopOutput)
+//
+//    env.execute(s"dfsWrite($path)")
+//
+//  }
+
+
+  test("C = A + B, identically partitioned") {
+
+    val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7))
+
+    val A = drmParallelize(inCoreA, numPartitions = 2)
+
+     //   printf("A.nrow=%d.\n", A.rdd.count())
+
+    // Create B which would be identically partitioned to A. mapBlock() by 
default will do the trick.
+    val B = A.mapBlock() {
+      case (keys, block) =>
+        val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()}
+        keys -> bBlock
+    }
+      // Prevent repeated computation non-determinism
+      // flink problem is here... checkpoint is not doing what it should
+      // ie. greate a physical plan w/o side effects
+      .checkpoint()
+
+    val inCoreB = B.collect
+
+    printf("A=\n%s\n", inCoreA)
+    printf("B=\n%s\n", inCoreB)
+
+    val C = A + B
+
+    val inCoreC = C.collect
+
+    printf("C=\n%s\n", inCoreC)
+
+    // Actual
+    val inCoreCControl = inCoreA + inCoreB
+
+    (inCoreC - inCoreCControl).norm should be < 1E-10
+  }
+//// Passing now.
+//  test("C = inCoreA %*%: B") {
+//
+//    val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7))
+//    val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7))
+//
+//    val B = drmParallelize(inCoreB, numPartitions = 2)
+//    val C = inCoreA %*%: B
+//
+//    val inCoreC = C.collect
+//    val inCoreCControl = inCoreA %*% inCoreB
+//
+//    println(inCoreC)
+//    (inCoreC - inCoreCControl).norm should be < 1E-10
+//
+//  }
+
+  test("dsqDist(X,Y)") {
+    val m = 100
+    val n = 300
+    val d = 7
+    val mxX = Matrices.symmetricUniformView(m, d, 12345).cloned -= 5
+    val mxY = Matrices.symmetricUniformView(n, d, 1234).cloned += 10
+    val (drmX, drmY) = (drmParallelize(mxX, 3), drmParallelize(mxY, 4))
+
+    val mxDsq = dsqDist(drmX, drmY).collect
+    val mxDsqControl = new DenseMatrix(m, n) := { (r, c, _) ⇒ (mxX(r, ::) - 
mxY(c, ::)) ^= 2 sum }
+    (mxDsq - mxDsqControl).norm should be < 1e-7
+  }
+
+  test("dsqDist(X)") {
+    val m = 100
+    val d = 7
+    val mxX = Matrices.symmetricUniformView(m, d, 12345).cloned -= 5
+    val drmX = drmParallelize(mxX, 3)
+
+    val mxDsq = dsqDist(drmX).collect
+    val mxDsqControl = sqDist(drmX)
+    (mxDsq - mxDsqControl).norm should be < 1e-7
+  }
+
+//// passing now
+//  test("DRM DFS i/o (local)") {
+//
+//    val uploadPath = TmpDir + "UploadedDRM"
+//
+//    val inCoreA = dense((1, 2, 3), (3, 4, 5))
+//    val drmA = drmParallelize(inCoreA)
+//
+//    drmA.dfsWrite(path = uploadPath)
+//
+//    println(inCoreA)
+//
+//    // Load back from hdfs
+//    val drmB = drmDfsRead(path = uploadPath)
+//
+//    // Make sure keys are correctly identified as ints
+//    drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int
+//
+//    // Collect back into in-core
+//    val inCoreB = drmB.collect
+//
+//    // Print out to see what it is we collected:
+//    println(inCoreB)
+//
+//    (inCoreA - inCoreB).norm should be < 1e-7
+//  }
+
+
+
+  test("dspca") {
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 
1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) :=
+      ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0))
+
+    // PCA Rotation matrix -- should also be orthonormal.
+    val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, 
rnd.nextInt) - 10.0)
+
+    val input = (u %*%: diagv(spectrum)) %*% tr.t
+    val drmInput = drmParallelize(m = input, numPartitions = 2)
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    // Since we assert just validity of the s-pca, not stochastic error, we 
bump p parameter to
+    // ensure to zero stochastic error and assert only functional correctness 
of the method's pca-
+    // specific additions.
+    val k = 10
+
+    // Calculate just first 10 principal factors and reduce dimensionality.
+    var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1)
+    // Un-normalized pca data:
+    drmPCA = drmPCA %*% diagv(s)
+
+    val pca = drmPCA.checkpoint(CacheHint.NONE).collect
+
+    // Of course, once we calculated the pca, the spectrum is going to be 
different since our originally
+    // generated input was not centered. So here, we'd just brute-solve pca to 
verify
+    val xi = input.colMeans()
+    for (r <- 0 until input.nrow) input(r, ::) -= xi
+    var (pcaControl, _, sControl) = svd(m = input)
+    pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k)
+
+    printf("pca:\n%s\n", pca(0 until 10, 0 until 10))
+    printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10))
+
+    (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 
10).norm).abs should be < 1E-5
+
+  }
+
+  test("dals") {
+
+    val rnd = RandomUtils.getRandom
+
+    // Number of points
+    val m = 500
+    val n = 500
+
+    // Length of actual spectrum
+    val spectrumLen = 40
+
+    // Create singluar values with decay
+    val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 
1e-3))
+    printf("spectrum:%s\n", spectrum)
+
+    // Create A as an ideal input
+    val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 
%*%: diagv(spectrum)) %*%
+      qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
+    val drmA = drmParallelize(inCoreA, numPartitions = 2)
+
+    // Decompose using ALS
+    val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple
+    val inCoreU = drmU.collect
+    val inCoreV = drmV.collect
+
+    val predict = inCoreU %*% inCoreV.t
+
+    printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
+    printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 
until 3))
+
+    val err = (inCoreA - predict).norm
+    printf("norm of residuals %f\n", err)
+    printf("train iteration rmses: %s\n", rmse)
+
+    err should be < 15e-2
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
index 6dcedd9..4aa524f 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala
@@ -1,16 +1,9 @@
 package org.apache.mahout.flinkbindings
 
-import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
 import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.junit.runner.RunWith
+import org.apache.mahout.math.scalabindings._
 import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
 
-@RunWith(classOf[JUnitRunner])
 class FlinkByteBCastSuite extends FunSuite {
 
   test("BCast vector") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
index 98318e3..3e14d76 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala
@@ -31,7 +31,6 @@ import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
 
-@RunWith(classOf[JUnitRunner])
 class RLikeOpsSuite extends FunSuite with DistributedFlinkSuite {
 
   val LOGGER = LoggerFactory.getLogger(getClass())

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
index 07d62dc..0a5f145 100644
--- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
+++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory
 import org.scalatest.FunSuite
 import org.scalatest.junit.JUnitRunner
 
-@RunWith(classOf[JUnitRunner])
 class UseCasesSuite extends FunSuite with DistributedFlinkSuite {
 
   val LOGGER = LoggerFactory.getLogger(getClass())

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
index a766146..81ca737 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala
@@ -23,15 +23,14 @@ import org.apache.mahout.math._
 import scalabindings._
 import RLikeOps._
 import drm._
+import org.apache.flink.api.scala._
 import org.apache.mahout.flinkbindings._
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.drm.logical.OpAx
 import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm
-import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm
 import org.apache.mahout.math.drm.logical._
 
-@RunWith(classOf[JUnitRunner])
 class LATestSuite extends FunSuite with DistributedFlinkSuite {
 
   test("Ax blockified") {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
index f13597a..82ca3ff 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala
@@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class DistributedDecompositionsSuite extends FunSuite with 
DistributedFlinkSuite
       with DistributedDecompositionsSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
index 7f6b2c8..325d118 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala
@@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite
       with DrmLikeOpsSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
index 7cfc48b..dfa7360 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala
@@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner
 import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class DrmLikeSuite extends FunSuite with DistributedFlinkSuite
       with DrmLikeSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
new file mode 100644
index 0000000..d6feed9
--- /dev/null
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala
@@ -0,0 +1,11 @@
+package org.apache.mahout.flinkbindings.standard
+
+import org.apache.mahout.classifier.naivebayes.NBTestBase
+import org.apache.mahout.flinkbindings._
+import org.scalatest.FunSuite
+
+
+class NaiveBayesTestSuite extends FunSuite with DistributedFlinkSuite
+      with NBTestBase {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
index 1ba03b1..c0ff76c 100644
--- 
a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
+++ 
b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala
@@ -1,18 +1,10 @@
 package org.apache.mahout.flinkbindings.standard
 
 import org.apache.mahout.flinkbindings._
-import org.apache.mahout.math._
 import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.junit.runner.RunWith
 import org.scalatest.FunSuite
-import org.scalatest.junit.JUnitRunner
-import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase
 
 
-@RunWith(classOf[JUnitRunner])
 class RLikeDrmOpsSuite extends FunSuite with DistributedFlinkSuite
       with RLikeDrmOpsSuiteBase {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index 0124612..deaadc4 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -125,7 +125,7 @@
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
-      <version>2.21</version>
+      <version>2.24.0</version>
     </dependency>
 
     <!--  3rd-party -->

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
index e8ac475..016171d 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 /** Logical Times-left over in-core matrix operand */
 case class OpTimesLeftMatrix(
-    val left: Matrix,
+    left: Matrix,
     override var A: DrmLike[Int]
     ) extends AbstractUnaryOp[Int, Int] {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index ecb557b..34b1823 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -45,7 +45,7 @@ package object drm {
   //  type CacheHint = CacheHint.CacheHint
 
   def safeToNonNegInt(x: Long): Int = {
-    assert(x == x << -31 >>> -31, "transformation from long to Int is losing 
signficant bits, or is a negative number")
+    assert(x == x << -31 >>> -31, "transformation from long to Int is losing 
significant bits, or is a negative number")
     x.toInt
   }
 
@@ -175,7 +175,7 @@ package object drm {
     import RLikeDrmOps._
 
     val drmAcp = drmA.checkpoint()
-    
+
     val mu = drmAcp colMeans
 
     // Compute variance using mean(x^2) - mean(x)^2

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
index d0fd393..d72d2f0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala
@@ -24,7 +24,6 @@ import RLikeOps._
 import org.apache.mahout.logging._
 
 import scala.collection.JavaConversions._
-import scala.collection._
 
 object MMul extends MMBinaryFunc {
 
@@ -46,32 +45,32 @@ object MMul extends MMBinaryFunc {
         sd match {
 
           // Multiplication cases by a diagonal matrix.
-          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.COLWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagCW _
-          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.SPARSECOLWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagCW _
-          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.ROWWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagRW _
-          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.SPARSEROWWISE, _) if (a
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmDiagRW _
-
-          case (TraversingStructureEnum.COLWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmCWDiag _
-          case (TraversingStructureEnum.SPARSECOLWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmCWDiag _
-          case (TraversingStructureEnum.ROWWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmRWDiag _
-          case (TraversingStructureEnum.SPARSEROWWISE, _, 
TraversingStructureEnum.VECTORBACKED, _) if (b
-            .isInstanceOf[DiagonalMatrix]) ⇒ jvmRWDiag _
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.COLWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagCW
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.SPARSECOLWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagCW
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.ROWWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagRW
+          case (TraversingStructureEnum.VECTORBACKED, _, 
TraversingStructureEnum.SPARSEROWWISE, _)
+            if a.isInstanceOf[DiagonalMatrix] ⇒ jvmDiagRW
+
+          case (TraversingStructureEnum.COLWISE, _, 
TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmCWDiag
+          case (TraversingStructureEnum.SPARSECOLWISE, _, 
TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmCWDiag
+          case (TraversingStructureEnum.ROWWISE, _, 
TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmRWDiag
+          case (TraversingStructureEnum.SPARSEROWWISE, _, 
TraversingStructureEnum.VECTORBACKED, _)
+            if b.isInstanceOf[DiagonalMatrix] ⇒ jvmRWDiag
 
           // Dense-dense cases
-          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) if (a eq b.t) ⇒ jvmDRWAAt _
-          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) if (a.t eq b) ⇒ jvmDRWAAt _
+          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) if a eq b.t ⇒ jvmDRWAAt
+          case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) if a.t eq b ⇒ jvmDRWAAt
           case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.COLWISE, true) ⇒ jvmRWCW
           case (TraversingStructureEnum.ROWWISE, true, 
TraversingStructureEnum.ROWWISE, true) ⇒ jvmRWRW
           case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.COLWISE, true) ⇒ jvmCWCW
-          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) if ( a eq b.t) ⇒ jvmDCWAAt _
-          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) if ( a.t eq b) ⇒ jvmDCWAAt _
+          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) if a eq b.t ⇒ jvmDCWAAt
+          case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) if a.t eq b ⇒ jvmDCWAAt
           case (TraversingStructureEnum.COLWISE, true, 
TraversingStructureEnum.ROWWISE, true) ⇒ jvmCWRW
 
           // Sparse row matrix x sparse row matrix (array of vectors)
@@ -107,7 +106,7 @@ object MMul extends MMBinaryFunc {
           case (TraversingStructureEnum.COLWISE, false, 
TraversingStructureEnum.COLWISE, _) ⇒ jvmSparseCWCW2flips
 
           // Sparse methods are only effective if the first argument is 
sparse, so we need to do a swap.
-          case (_, _, _, false) ⇒ { (a, b, r) ⇒ apply(b.t, a.t, r.map 
{_.t}).t }
+          case (_, _, _, false) ⇒ (a, b, r) ⇒ apply(b.t, a.t, r.map 
{_.t}).t
 
           // Default jvm-jvm case.
           case _ ⇒ jvmRWCW

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
 
b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
index b288c62..de8228e 100644
--- 
a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
+++ 
b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala
@@ -78,7 +78,7 @@ trait DistributedDecompositionsSuiteBase extends 
DistributedMahoutSuite with Mat
     printf("qControl2=\n%s\n", qControl2)
     printf("rControl2=\n%s\n", rControl2)
 
-    // Housholder approach seems to be a little bit more stable
+    // Householder approach seems to be a little bit more stable
     (rControl - inCoreR).norm should be < 1E-5
     (qControl - inCoreQ).norm should be < 1E-5
 
@@ -86,7 +86,7 @@ trait DistributedDecompositionsSuiteBase extends 
DistributedMahoutSuite with Mat
     (rControl2 - inCoreR).norm should be < 1E-10
     (qControl2 - inCoreQ).norm should be < 1E-10
 
-    // Assert orhtogonality:
+    // Assert orthogonality:
     // (a) Q[,j] dot Q[,j] == 1.0 for all j
     // (b) Q[,i] dot Q[,j] == 0.0 for all i != j
     for (col <- 0 until inCoreQ.ncol)

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
 
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
index b46ee30..f18d23b 100644
--- 
a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
+++ 
b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala
@@ -347,6 +347,9 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite 
with Matchers {
         keys -> bBlock
     }
         // Prevent repeated computation non-determinism
+        // removing this checkpoint() will cause the same error in spark Tests
+        // as we're seeing in Flink with this test.  ie  
util.Random.nextDouble()
+        // is being called more than once (note that it is not seeded in the 
closure)
         .checkpoint()
 
     val inCoreB = B.collect

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
index 3869830..f6deb15 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala
@@ -75,10 +75,10 @@ abstract class MahoutSparkDriver extends MahoutDriver {
   override protected def start() : Unit = {
     if (!_useExistingContext) {
       sparkConf.set("spark.kryo.referenceTracking", "false")
-        .set("spark.kryoserializer.buffer.mb", "200m")// this is default for 
Mahout optimizer, change it with -D option
+        .set("spark.kryoserializer.buffer.mb", "200")// this is default for 
Mahout optimizer, change it with -D option
         // the previous has been marked deprecated as of Spark 1.4 by the 
below line,
         // remove the above line when Spark finally retires above for below
-        .set("spark.kryoserializer.buffer", "200m")
+        .set("spark.kryoserializer.buffer", "200")
 
 
       if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "")

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
index b3a1ec2..fde37bf 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala
@@ -26,17 +26,17 @@ class MahoutSparkOptionParser(programName: String) extends 
MahoutOptionParser(pr
     opts = opts + ("appName" -> programName)
     note("\nSpark config options:")
 
-    opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). 
Default: \"local\". Note that you can " +
+    opt[String]("master") abbr "ma" text ("Spark Master URL (optional). 
Default: \"local\". Note that you can " +
       "specify the number of cores to get a performance improvement, for 
example \"local[4]\"") action { (x, options) =>
         options + ("master" -> x)
     }
 
-    opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap 
available as \"executor memory\" on each " +
+    opt[String]("sparkExecutorMem") abbr "sem" text ("Max Java heap available 
as \"executor memory\" on each " +
       "node (optional). Default: as Spark config specifies") action { (x, 
options) =>
         options + ("sparkExecutorMem" -> x)
     }
 
-    opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, 
v) =>
+    opt[(String, String)]("define") abbr "D" unbounded() foreach { case (k, v) 
=>
       sparkConf.set(k, v)
     } validate { x =>
       if (x._2 != "") success else failure("Value <sparkConfValue> must be 
non-blank")

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
index 817c6ff..2cedc20 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala
@@ -20,7 +20,6 @@ package org.apache.mahout.drivers
 import org.apache.mahout.common.HDFSPathSearch
 import org.apache.mahout.math.cf.SimilarityAnalysis
 import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, 
indexedDatasetDFSRead}
-import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scala.collection.immutable.HashMap
 
 /**
@@ -63,7 +62,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       opts = opts ++ RowSimilarityOptions
 
       note("\nAlgorithm control options:")
-      opt[Int]("maxObservations") abbr ("mo") action { (x, options) =>
+      opt[Int]("maxObservations") abbr "mo" action { (x, options) =>
         options + ("maxObservations" -> x)
       } text ("Max number of observations to consider per row (optional). 
Default: " +
         RowSimilarityOptions("maxObservations")) validate { x =>
@@ -96,7 +95,7 @@ object RowSimilarityDriver extends MahoutSparkDriver {
       //Jar inclusion, this option can be set when executing the driver from 
compiled code, not when from CLI
       parseGenericOptions()
 
-      help("help") abbr ("h") text ("prints this usage text\n")
+      help("help") abbr "h" text "prints this usage text\n"
 
     }
     parser.parse(args, parser.opts) map { opts =>

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
index b5f76e0..d4f1aea 100644
--- 
a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala
@@ -18,12 +18,12 @@
 package org.apache.mahout.drivers
 
 import org.apache.log4j.Logger
-import org.apache.mahout.math.indexeddataset._
-import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
-import org.apache.spark.SparkContext._
 import org.apache.mahout.math.RandomAccessSparseVector
-import org.apache.mahout.math.drm.{DrmLike, DrmLikeOps, DistributedContext, 
CheckpointedDrm}
+import org.apache.mahout.math.drm.DistributedContext
+import org.apache.mahout.math.indexeddataset._
 import org.apache.mahout.sparkbindings._
+import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
+
 import scala.collection.JavaConversions._
 
 /**
@@ -269,7 +269,7 @@ trait TDIndexedDatasetWriter extends 
Writer[IndexedDatasetSpark]{
         val vector = if (sort) itemList.sortBy { elem => -elem._2 } else 
itemList
 
         // first get the external rowID token
-        if (!vector.isEmpty){
+        if (vector.nonEmpty){
           var line = rowIDDictionary_bcast.value.inverse.getOrElse(rowID, 
"INVALID_ROW_ID") + rowKeyDelim
           // for the rest of the row, construct the vector contents of 
elements (external column ID, strength value)
           for (item <- vector) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala 
b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
index e9f2f95..eeed97a 100644
--- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
+++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala
@@ -17,12 +17,11 @@
 
 package org.apache.mahout.drivers
 
-import org.apache.mahout.classifier.naivebayes._
-import org.apache.mahout.classifier.naivebayes.SparkNaiveBayes
+import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _}
 import org.apache.mahout.common.Hadoop1HDFSUtil
 import org.apache.mahout.math.drm
 import org.apache.mahout.math.drm.DrmLike
-import org.apache.mahout.math.drm.RLikeDrmOps.drm2RLikeOps
+
 import scala.collection.immutable.HashMap
 
 
@@ -48,33 +47,33 @@ object TrainNBDriver extends MahoutSparkDriver {
 
       // default trainComplementary is false
       opts = opts + ("trainComplementary" -> false)
-      opt[Unit]("trainComplementary") abbr ("c") action { (_, options) =>
+      opt[Unit]("trainComplementary") abbr "c" action { (_, options) =>
         options + ("trainComplementary" -> true)
-      } text ("Train a complementary model, Default: false.")
+      } text "Train a complementary model, Default: false."
 
       // Laplace smoothing paramater default is 1.0
       opts = opts + ("alphaI" -> 1.0)
-      opt[Double]("alphaI") abbr ("a") action { (x, options) =>
+      opt[Double]("alphaI") abbr "a" action { (x, options) =>
         options + ("alphaI" -> x)
-      } text ("Laplace soothing factor default is 1.0") validate { x =>
+      } text "Laplace smothing factor default is 1.0" validate { x =>
         if (x > 0) success else failure("Option --alphaI must be > 0")
       }
 
       // Overwrite the output directory (with the model) if it exists?  
Default: false
       opts = opts + ("overwrite" -> false)
-      opt[Unit]("overwrite") abbr ("ow") action { (_, options) =>
+      opt[Unit]("overwrite") abbr "ow" action { (_, options) =>
         options + ("overwrite" -> true)
-      } text ("Overwrite the output directory (with the model) if it exists? 
Default: false")
+      } text "Overwrite the output directory (with the model) if it exists? 
Default: false"
 
       // Spark config options--not driver specific
       parseSparkOptions()
 
-      help("help") abbr ("h") text ("prints this usage text\n")
+      help("help") abbr "h" text "prints this usage text\n"
 
     }
     parser.parse(args, parser.opts) map { opts =>
       parser.opts = opts
-      process()
+      process
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
index 4d13a5a..96ba8cd 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
@@ -17,7 +17,7 @@
 
 package org.apache.mahout.sparkbindings
 
-import org.apache.mahout.math.drm.{DistributedEngine, BCast, 
DistributedContext}
+import org.apache.mahout.math.drm.{DistributedEngine, DistributedContext}
 import org.apache.spark.SparkContext
 
 class SparkDistributedContext(val sc: SparkContext) extends DistributedContext 
{

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index ffb164c..676b496 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -85,7 +85,7 @@ object ABt {
       s"A=${operator.A.nrow}x${operator.A.ncol}, 
B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol."
     )
 
-    // blockwise multimplication function
+    // blockwise multiplication function
     def mmulFunc(tupleA: BlockifiedDrmTuple[K], tupleB: 
BlockifiedDrmTuple[Int]): (Array[K], Array[Int], Matrix) = {
       val (keysA, blockA) = tupleA
       val (keysB, blockB) = tupleB

http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
index a9dc874..4c75e75 100644
--- 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala
@@ -40,8 +40,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite 
with LoggerConfigurat
       // Do not run MAHOUT_HOME jars in unit tests.
       addMahoutJars = !isLocal,
       sparkConf = new SparkConf()
-          .set("spark.kryoserializer.buffer.mb", "40m")
-          .set("spark.kryoserializer.buffer", "40m")
+          .set("spark.kryoserializer.buffer.mb", "40")
+          .set("spark.kryoserializer.buffer", "40")
           .set("spark.akka.frameSize", "30")
           .set("spark.default.parallelism", "10")
           .set("spark.executor.memory", "2G")

Reply via email to