Repository: mahout Updated Branches: refs/heads/flink-binding 92a2f6c8f -> 072289a46
http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala new file mode 100644 index 0000000..50d3bc6 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/io/Hadoop2HDFSUtil.scala @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.mahout.flinkbindings.io + +import org.apache.hadoop.io.{ Writable, SequenceFile } +import org.apache.hadoop.fs.{ FileSystem, Path } +import org.apache.hadoop.conf.Configuration +import collection._ +import JavaConversions._ + +/** + * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work + * with Hadoop 2.0 + * + * Copied from /spark/src/main/scala/org/apache/mahout/common + */ +object Hadoop2HDFSUtil extends HDFSUtil { + + /** + * Read the header of a sequence file and determine the Key and Value type + * @param path + * @return + */ + def readDrmHeader(path: String): DrmMetadata = { + val dfsPath = new Path(path) + val conf = new Configuration() + val fs = dfsPath.getFileSystem(conf) + + fs.setConf(conf) + + val partFilePath: Path = fs.listStatus(dfsPath) + + // Filter out anything starting with . + .filter { s => + !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir + } + + // Take path + .map(_.getPath) + + // Take only one, if any + .headOption + + // Require there's at least one partition file found. + .getOrElse { + throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.") + } + + // flink is retiring hadoop 1 + val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf) + + // hadoop 2 reader +// val reader: SequenceFile.Reader = new SequenceFile.Reader(fs.getConf, +// SequenceFile.Reader.file(partFilePath)); + try { + new DrmMetadata( + keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]), + valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])) + } finally { + reader.close() + } + + } + + /** + * Delete a path from the filesystem + * @param path + */ + def delete(path: String) { + val dfsPath = new Path(path) + val fs = dfsPath.getFileSystem(new Configuration()) + + if (fs.exists(dfsPath)) { + fs.delete(dfsPath, true) + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index 6b8f2ae..b083752 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -18,16 +18,15 @@ */ package org.apache.mahout -import org.apache.flink.api.common.functions.{FilterFunction, MapFunction} -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} -import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm} -import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} +import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrm, CheckpointedFlinkDrmOps, FlinkDrm, RowsFlinkDrm} import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _} +import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable} import org.slf4j.LoggerFactory -import org.apache.flink.api.scala._ -import org.apache.flink.api.scala.utils._ - import scala.Array._ import scala.reflect.ClassTag @@ -44,7 +43,6 @@ package object flinkbindings { */ type BlockifiedDrmDataSet[K] = DataSet[BlockifiedDrmTuple[K]] - implicit def wrapMahoutContext(context: DistributedContext): FlinkDistributedContext = { assert(context.isInstanceOf[FlinkDistributedContext], "it must be FlinkDistributedContext") context.asInstanceOf[FlinkDistributedContext] @@ -62,7 +60,7 @@ package object flinkbindings { drm.asInstanceOf[CheckpointedFlinkDrm[K]] } - implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = { + implicit def checkpointedDrmToFlinkDrm[K: TypeInformation: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = { val flinkDrm = castCheckpointedDrm(cp) new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol) } @@ -83,10 +81,8 @@ package object flinkbindings { def readCsv(file: String, delim: String = ",", comment: String = "#") (implicit dc: DistributedContext): CheckpointedDrm[Long] = { val vectors = dc.env.readTextFile(file) - .filter(new FilterFunction[String] { - def filter(in: String): Boolean = { - !in.startsWith(comment) - } + .filter((in: String) => { + !in.startsWith(comment) }) .map(new MapFunction[String, Vector] { def map(in: String): Vector = { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala index 6fb71ea..41c7a6a 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala @@ -31,6 +31,8 @@ trait DistributedFlinkSuite extends DistributedMahoutSuite { this: Suite => def initContext() { env = ExecutionEnvironment.getExecutionEnvironment + // set this higher so that tests like dsqDist(X,Y) have enough available slots to pass on a single machine. + env.setParallelism(10) mahoutCtx = wrapContext(env) } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala index 83d7f43..725f31a 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala @@ -31,7 +31,6 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory -@RunWith(classOf[JUnitRunner]) class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { test("norm") { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala new file mode 100644 index 0000000..b834912 --- /dev/null +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FailingTestsSuite.scala @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.mahout.flinkbindings + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.scala.DataSet +import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat +import org.apache.mahout.common.RandomUtils + +import scala.collection.immutable.List + +//import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, SequenceFileOutputFormat} +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import RLikeDrmOps._ +import RLikeOps._ +import math._ + +import org.apache.mahout.math.decompositions._ +import org.scalatest.{FunSuite, Matchers} + + +import scala.reflect.ClassTag +import org.apache.flink.api.scala._ + + + +class FailingTestsSuite extends FunSuite with DistributedFlinkSuite with Matchers { + +// // passing now +// test("Simple DataSet to IntWritable") { +// val path = TmpDir + "flinkOutput" +// +// implicit val typeInfo = createTypeInformation[(Int,Int)] +// val ds = env.fromElements[(Int,Int)]((1,2),(3,4),(5,6),(7,8)) +// // val job = new JobConf +// +// +// val writableDataset : DataSet[(IntWritable,IntWritable)] = +// ds.map( tuple => +// (new IntWritable(tuple._1.asInstanceOf[Int]), new IntWritable(tuple._2.asInstanceOf[Int])) +// ) +// +// val job: Job = new Job() +// +// job.setOutputKeyClass(classOf[IntWritable]) +// job.setOutputValueClass(classOf[IntWritable]) +// +// // setup sink for IntWritable +// val sequenceFormat = new SequenceFileOutputFormat[IntWritable, IntWritable] +// val hadoopOutput = new HadoopOutputFormat[IntWritable,IntWritable](sequenceFormat, job) +// FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(path)) +// +// writableDataset.output(hadoopOutput) +// +// env.execute(s"dfsWrite($path)") +// +// } + + + test("C = A + B, identically partitioned") { + + val inCoreA = dense((1, 2, 3), (3, 4, 5), (5, 6, 7)) + + val A = drmParallelize(inCoreA, numPartitions = 2) + + // printf("A.nrow=%d.\n", A.rdd.count()) + + // Create B which would be identically partitioned to A. mapBlock() by default will do the trick. + val B = A.mapBlock() { + case (keys, block) => + val bBlock = block.like() := { (r, c, v) => util.Random.nextDouble()} + keys -> bBlock + } + // Prevent repeated computation non-determinism + // flink problem is here... checkpoint is not doing what it should + // ie. greate a physical plan w/o side effects + .checkpoint() + + val inCoreB = B.collect + + printf("A=\n%s\n", inCoreA) + printf("B=\n%s\n", inCoreB) + + val C = A + B + + val inCoreC = C.collect + + printf("C=\n%s\n", inCoreC) + + // Actual + val inCoreCControl = inCoreA + inCoreB + + (inCoreC - inCoreCControl).norm should be < 1E-10 + } +//// Passing now. +// test("C = inCoreA %*%: B") { +// +// val inCoreA = dense((1, 2, 3), (3, 4, 5), (4, 5, 6), (5, 6, 7)) +// val inCoreB = dense((3, 5, 7, 10), (4, 6, 9, 10), (5, 6, 7, 7)) +// +// val B = drmParallelize(inCoreB, numPartitions = 2) +// val C = inCoreA %*%: B +// +// val inCoreC = C.collect +// val inCoreCControl = inCoreA %*% inCoreB +// +// println(inCoreC) +// (inCoreC - inCoreCControl).norm should be < 1E-10 +// +// } + + test("dsqDist(X,Y)") { + val m = 100 + val n = 300 + val d = 7 + val mxX = Matrices.symmetricUniformView(m, d, 12345).cloned -= 5 + val mxY = Matrices.symmetricUniformView(n, d, 1234).cloned += 10 + val (drmX, drmY) = (drmParallelize(mxX, 3), drmParallelize(mxY, 4)) + + val mxDsq = dsqDist(drmX, drmY).collect + val mxDsqControl = new DenseMatrix(m, n) := { (r, c, _) â (mxX(r, ::) - mxY(c, ::)) ^= 2 sum } + (mxDsq - mxDsqControl).norm should be < 1e-7 + } + + test("dsqDist(X)") { + val m = 100 + val d = 7 + val mxX = Matrices.symmetricUniformView(m, d, 12345).cloned -= 5 + val drmX = drmParallelize(mxX, 3) + + val mxDsq = dsqDist(drmX).collect + val mxDsqControl = sqDist(drmX) + (mxDsq - mxDsqControl).norm should be < 1e-7 + } + +//// passing now +// test("DRM DFS i/o (local)") { +// +// val uploadPath = TmpDir + "UploadedDRM" +// +// val inCoreA = dense((1, 2, 3), (3, 4, 5)) +// val drmA = drmParallelize(inCoreA) +// +// drmA.dfsWrite(path = uploadPath) +// +// println(inCoreA) +// +// // Load back from hdfs +// val drmB = drmDfsRead(path = uploadPath) +// +// // Make sure keys are correctly identified as ints +// drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int +// +// // Collect back into in-core +// val inCoreB = drmB.collect +// +// // Print out to see what it is we collected: +// println(inCoreB) +// +// (inCoreA - inCoreB).norm should be < 1e-7 +// } + + + + test("dspca") { + + val rnd = RandomUtils.getRandom + + // Number of points + val m = 500 + // Length of actual spectrum + val spectrumLen = 40 + + val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) + printf("spectrum:%s\n", spectrum) + + val (u, _) = qr(new SparseRowMatrix(m, spectrumLen) := + ((r, c, v) => if (rnd.nextDouble() < 0.2) 0 else rnd.nextDouble() + 5.0)) + + // PCA Rotation matrix -- should also be orthonormal. + val (tr, _) = qr(Matrices.symmetricUniformView(spectrumLen, spectrumLen, rnd.nextInt) - 10.0) + + val input = (u %*%: diagv(spectrum)) %*% tr.t + val drmInput = drmParallelize(m = input, numPartitions = 2) + + // Calculate just first 10 principal factors and reduce dimensionality. + // Since we assert just validity of the s-pca, not stochastic error, we bump p parameter to + // ensure to zero stochastic error and assert only functional correctness of the method's pca- + // specific additions. + val k = 10 + + // Calculate just first 10 principal factors and reduce dimensionality. + var (drmPCA, _, s) = dspca(drmA = drmInput, k = 10, p = spectrumLen, q = 1) + // Un-normalized pca data: + drmPCA = drmPCA %*% diagv(s) + + val pca = drmPCA.checkpoint(CacheHint.NONE).collect + + // Of course, once we calculated the pca, the spectrum is going to be different since our originally + // generated input was not centered. So here, we'd just brute-solve pca to verify + val xi = input.colMeans() + for (r <- 0 until input.nrow) input(r, ::) -= xi + var (pcaControl, _, sControl) = svd(m = input) + pcaControl = (pcaControl %*%: diagv(sControl))(::, 0 until k) + + printf("pca:\n%s\n", pca(0 until 10, 0 until 10)) + printf("pcaControl:\n%s\n", pcaControl(0 until 10, 0 until 10)) + + (pca(0 until 10, 0 until 10).norm - pcaControl(0 until 10, 0 until 10).norm).abs should be < 1E-5 + + } + + test("dals") { + + val rnd = RandomUtils.getRandom + + // Number of points + val m = 500 + val n = 500 + + // Length of actual spectrum + val spectrumLen = 40 + + // Create singluar values with decay + val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) + printf("spectrum:%s\n", spectrum) + + // Create A as an ideal input + val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*% + qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t + val drmA = drmParallelize(inCoreA, numPartitions = 2) + + // Decompose using ALS + val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple + val inCoreU = drmU.collect + val inCoreV = drmV.collect + + val predict = inCoreU %*% inCoreV.t + + printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3)) + printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3)) + + val err = (inCoreA - predict).norm + printf("norm of residuals %f\n", err) + printf("train iteration rmses: %s\n", rmse) + + err should be < 15e-2 + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala index 6dcedd9..4aa524f 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala @@ -1,16 +1,9 @@ package org.apache.mahout.flinkbindings -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.junit.runner.RunWith +import org.apache.mahout.math.scalabindings._ import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner -@RunWith(classOf[JUnitRunner]) class FlinkByteBCastSuite extends FunSuite { test("BCast vector") { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala index 98318e3..3e14d76 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/RLikeOpsSuite.scala @@ -31,7 +31,6 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory -@RunWith(classOf[JUnitRunner]) class RLikeOpsSuite extends FunSuite with DistributedFlinkSuite { val LOGGER = LoggerFactory.getLogger(getClass()) http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala index 07d62dc..0a5f145 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/UseCasesSuite.scala @@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory import org.scalatest.FunSuite import org.scalatest.junit.JUnitRunner -@RunWith(classOf[JUnitRunner]) class UseCasesSuite extends FunSuite with DistributedFlinkSuite { val LOGGER = LoggerFactory.getLogger(getClass()) http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala index a766146..81ca737 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/blas/LATestSuite.scala @@ -23,15 +23,14 @@ import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ import drm._ +import org.apache.flink.api.scala._ import org.apache.mahout.flinkbindings._ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.apache.mahout.math.drm.logical.OpAx import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm import org.apache.mahout.math.drm.logical._ -@RunWith(classOf[JUnitRunner]) class LATestSuite extends FunSuite with DistributedFlinkSuite { test("Ax blockified") { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala index f13597a..82ca3ff 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DistributedDecompositionsSuite.scala @@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase -@RunWith(classOf[JUnitRunner]) class DistributedDecompositionsSuite extends FunSuite with DistributedFlinkSuite with DistributedDecompositionsSuiteBase { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala index 7f6b2c8..325d118 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeOpsSuite.scala @@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase -@RunWith(classOf[JUnitRunner]) class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite with DrmLikeOpsSuiteBase { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala index 7cfc48b..dfa7360 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/DrmLikeSuite.scala @@ -12,7 +12,6 @@ import org.scalatest.junit.JUnitRunner import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase -@RunWith(classOf[JUnitRunner]) class DrmLikeSuite extends FunSuite with DistributedFlinkSuite with DrmLikeSuiteBase { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala new file mode 100644 index 0000000..d6feed9 --- /dev/null +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala @@ -0,0 +1,11 @@ +package org.apache.mahout.flinkbindings.standard + +import org.apache.mahout.classifier.naivebayes.NBTestBase +import org.apache.mahout.flinkbindings._ +import org.scalatest.FunSuite + + +class NaiveBayesTestSuite extends FunSuite with DistributedFlinkSuite + with NBTestBase { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala index 1ba03b1..c0ff76c 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/RLikeDrmOpsSuite.scala @@ -1,18 +1,10 @@ package org.apache.mahout.flinkbindings.standard import org.apache.mahout.flinkbindings._ -import org.apache.mahout.math._ import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.RLikeDrmOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.junit.runner.RunWith import org.scalatest.FunSuite -import org.scalatest.junit.JUnitRunner -import org.apache.mahout.math.decompositions.DistributedDecompositionsSuiteBase -@RunWith(classOf[JUnitRunner]) class RLikeDrmOpsSuite extends FunSuite with DistributedFlinkSuite with RLikeDrmOpsSuiteBase { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/pom.xml ---------------------------------------------------------------------- diff --git a/math-scala/pom.xml b/math-scala/pom.xml index 0124612..deaadc4 100644 --- a/math-scala/pom.xml +++ b/math-scala/pom.xml @@ -125,7 +125,7 @@ <dependency> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> - <version>2.21</version> + <version>2.24.0</version> </dependency> <!-- 3rd-party --> http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala index e8ac475..016171d 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesLeftMatrix.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag /** Logical Times-left over in-core matrix operand */ case class OpTimesLeftMatrix( - val left: Matrix, + left: Matrix, override var A: DrmLike[Int] ) extends AbstractUnaryOp[Int, Int] { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index ecb557b..34b1823 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -45,7 +45,7 @@ package object drm { // type CacheHint = CacheHint.CacheHint def safeToNonNegInt(x: Long): Int = { - assert(x == x << -31 >>> -31, "transformation from long to Int is losing signficant bits, or is a negative number") + assert(x == x << -31 >>> -31, "transformation from long to Int is losing significant bits, or is a negative number") x.toInt } @@ -175,7 +175,7 @@ package object drm { import RLikeDrmOps._ val drmAcp = drmA.checkpoint() - + val mu = drmAcp colMeans // Compute variance using mean(x^2) - mean(x)^2 http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala index d0fd393..d72d2f0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/MMul.scala @@ -24,7 +24,6 @@ import RLikeOps._ import org.apache.mahout.logging._ import scala.collection.JavaConversions._ -import scala.collection._ object MMul extends MMBinaryFunc { @@ -46,32 +45,32 @@ object MMul extends MMBinaryFunc { sd match { // Multiplication cases by a diagonal matrix. - case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.COLWISE, _) if (a - .isInstanceOf[DiagonalMatrix]) â jvmDiagCW _ - case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSECOLWISE, _) if (a - .isInstanceOf[DiagonalMatrix]) â jvmDiagCW _ - case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.ROWWISE, _) if (a - .isInstanceOf[DiagonalMatrix]) â jvmDiagRW _ - case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSEROWWISE, _) if (a - .isInstanceOf[DiagonalMatrix]) â jvmDiagRW _ - - case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b - .isInstanceOf[DiagonalMatrix]) â jvmCWDiag _ - case (TraversingStructureEnum.SPARSECOLWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b - .isInstanceOf[DiagonalMatrix]) â jvmCWDiag _ - case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b - .isInstanceOf[DiagonalMatrix]) â jvmRWDiag _ - case (TraversingStructureEnum.SPARSEROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) if (b - .isInstanceOf[DiagonalMatrix]) â jvmRWDiag _ + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.COLWISE, _) + if a.isInstanceOf[DiagonalMatrix] â jvmDiagCW + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSECOLWISE, _) + if a.isInstanceOf[DiagonalMatrix] â jvmDiagCW + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.ROWWISE, _) + if a.isInstanceOf[DiagonalMatrix] â jvmDiagRW + case (TraversingStructureEnum.VECTORBACKED, _, TraversingStructureEnum.SPARSEROWWISE, _) + if a.isInstanceOf[DiagonalMatrix] â jvmDiagRW + + case (TraversingStructureEnum.COLWISE, _, TraversingStructureEnum.VECTORBACKED, _) + if b.isInstanceOf[DiagonalMatrix] â jvmCWDiag + case (TraversingStructureEnum.SPARSECOLWISE, _, TraversingStructureEnum.VECTORBACKED, _) + if b.isInstanceOf[DiagonalMatrix] â jvmCWDiag + case (TraversingStructureEnum.ROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) + if b.isInstanceOf[DiagonalMatrix] â jvmRWDiag + case (TraversingStructureEnum.SPARSEROWWISE, _, TraversingStructureEnum.VECTORBACKED, _) + if b.isInstanceOf[DiagonalMatrix] â jvmRWDiag // Dense-dense cases - case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if (a eq b.t) â jvmDRWAAt _ - case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if (a.t eq b) â jvmDRWAAt _ + case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if a eq b.t â jvmDRWAAt + case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) if a.t eq b â jvmDRWAAt case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.COLWISE, true) â jvmRWCW case (TraversingStructureEnum.ROWWISE, true, TraversingStructureEnum.ROWWISE, true) â jvmRWRW case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.COLWISE, true) â jvmCWCW - case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if ( a eq b.t) â jvmDCWAAt _ - case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if ( a.t eq b) â jvmDCWAAt _ + case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if a eq b.t â jvmDCWAAt + case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) if a.t eq b â jvmDCWAAt case (TraversingStructureEnum.COLWISE, true, TraversingStructureEnum.ROWWISE, true) â jvmCWRW // Sparse row matrix x sparse row matrix (array of vectors) @@ -107,7 +106,7 @@ object MMul extends MMBinaryFunc { case (TraversingStructureEnum.COLWISE, false, TraversingStructureEnum.COLWISE, _) â jvmSparseCWCW2flips // Sparse methods are only effective if the first argument is sparse, so we need to do a swap. - case (_, _, _, false) â { (a, b, r) â apply(b.t, a.t, r.map {_.t}).t } + case (_, _, _, false) â (a, b, r) â apply(b.t, a.t, r.map {_.t}).t // Default jvm-jvm case. case _ â jvmRWCW http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala index b288c62..de8228e 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/decompositions/DistributedDecompositionsSuiteBase.scala @@ -78,7 +78,7 @@ trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Mat printf("qControl2=\n%s\n", qControl2) printf("rControl2=\n%s\n", rControl2) - // Housholder approach seems to be a little bit more stable + // Householder approach seems to be a little bit more stable (rControl - inCoreR).norm should be < 1E-5 (qControl - inCoreQ).norm should be < 1E-5 @@ -86,7 +86,7 @@ trait DistributedDecompositionsSuiteBase extends DistributedMahoutSuite with Mat (rControl2 - inCoreR).norm should be < 1E-10 (qControl2 - inCoreQ).norm should be < 1E-10 - // Assert orhtogonality: + // Assert orthogonality: // (a) Q[,j] dot Q[,j] == 1.0 for all j // (b) Q[,i] dot Q[,j] == 0.0 for all i != j for (col <- 0 until inCoreQ.ncol) http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala index b46ee30..f18d23b 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/RLikeDrmOpsSuiteBase.scala @@ -347,6 +347,9 @@ trait RLikeDrmOpsSuiteBase extends DistributedMahoutSuite with Matchers { keys -> bBlock } // Prevent repeated computation non-determinism + // removing this checkpoint() will cause the same error in spark Tests + // as we're seeing in Flink with this test. ie util.Random.nextDouble() + // is being called more than once (note that it is not seeded in the closure) .checkpoint() val inCoreB = B.collect http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala index 3869830..f6deb15 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkDriver.scala @@ -75,10 +75,10 @@ abstract class MahoutSparkDriver extends MahoutDriver { override protected def start() : Unit = { if (!_useExistingContext) { sparkConf.set("spark.kryo.referenceTracking", "false") - .set("spark.kryoserializer.buffer.mb", "200m")// this is default for Mahout optimizer, change it with -D option + .set("spark.kryoserializer.buffer.mb", "200")// this is default for Mahout optimizer, change it with -D option // the previous has been marked deprecated as of Spark 1.4 by the below line, // remove the above line when Spark finally retires above for below - .set("spark.kryoserializer.buffer", "200m") + .set("spark.kryoserializer.buffer", "200") if (parser.opts("sparkExecutorMem").asInstanceOf[String] != "") http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala index b3a1ec2..fde37bf 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/MahoutSparkOptionParser.scala @@ -26,17 +26,17 @@ class MahoutSparkOptionParser(programName: String) extends MahoutOptionParser(pr opts = opts + ("appName" -> programName) note("\nSpark config options:") - opt[String]("master") abbr ("ma") text ("Spark Master URL (optional). Default: \"local\". Note that you can " + + opt[String]("master") abbr "ma" text ("Spark Master URL (optional). Default: \"local\". Note that you can " + "specify the number of cores to get a performance improvement, for example \"local[4]\"") action { (x, options) => options + ("master" -> x) } - opt[String]("sparkExecutorMem") abbr ("sem") text ("Max Java heap available as \"executor memory\" on each " + + opt[String]("sparkExecutorMem") abbr "sem" text ("Max Java heap available as \"executor memory\" on each " + "node (optional). Default: as Spark config specifies") action { (x, options) => options + ("sparkExecutorMem" -> x) } - opt[(String, String)]("define") abbr ("D") unbounded() foreach { case (k, v) => + opt[(String, String)]("define") abbr "D" unbounded() foreach { case (k, v) => sparkConf.set(k, v) } validate { x => if (x._2 != "") success else failure("Value <sparkConfValue> must be non-blank") http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala index 817c6ff..2cedc20 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/RowSimilarityDriver.scala @@ -20,7 +20,6 @@ package org.apache.mahout.drivers import org.apache.mahout.common.HDFSPathSearch import org.apache.mahout.math.cf.SimilarityAnalysis import org.apache.mahout.math.indexeddataset.{Schema, IndexedDataset, indexedDatasetDFSRead} -import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark import scala.collection.immutable.HashMap /** @@ -63,7 +62,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { opts = opts ++ RowSimilarityOptions note("\nAlgorithm control options:") - opt[Int]("maxObservations") abbr ("mo") action { (x, options) => + opt[Int]("maxObservations") abbr "mo" action { (x, options) => options + ("maxObservations" -> x) } text ("Max number of observations to consider per row (optional). Default: " + RowSimilarityOptions("maxObservations")) validate { x => @@ -96,7 +95,7 @@ object RowSimilarityDriver extends MahoutSparkDriver { //Jar inclusion, this option can be set when executing the driver from compiled code, not when from CLI parseGenericOptions() - help("help") abbr ("h") text ("prints this usage text\n") + help("help") abbr "h" text "prints this usage text\n" } parser.parse(args, parser.opts) map { opts => http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala index b5f76e0..d4f1aea 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TextDelimitedReaderWriter.scala @@ -18,12 +18,12 @@ package org.apache.mahout.drivers import org.apache.log4j.Logger -import org.apache.mahout.math.indexeddataset._ -import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark -import org.apache.spark.SparkContext._ import org.apache.mahout.math.RandomAccessSparseVector -import org.apache.mahout.math.drm.{DrmLike, DrmLikeOps, DistributedContext, CheckpointedDrm} +import org.apache.mahout.math.drm.DistributedContext +import org.apache.mahout.math.indexeddataset._ import org.apache.mahout.sparkbindings._ +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark + import scala.collection.JavaConversions._ /** @@ -269,7 +269,7 @@ trait TDIndexedDatasetWriter extends Writer[IndexedDatasetSpark]{ val vector = if (sort) itemList.sortBy { elem => -elem._2 } else itemList // first get the external rowID token - if (!vector.isEmpty){ + if (vector.nonEmpty){ var line = rowIDDictionary_bcast.value.inverse.getOrElse(rowID, "INVALID_ROW_ID") + rowKeyDelim // for the rest of the row, construct the vector contents of elements (external column ID, strength value) for (item <- vector) { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala index e9f2f95..eeed97a 100644 --- a/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala +++ b/spark/src/main/scala/org/apache/mahout/drivers/TrainNBDriver.scala @@ -17,12 +17,11 @@ package org.apache.mahout.drivers -import org.apache.mahout.classifier.naivebayes._ -import org.apache.mahout.classifier.naivebayes.SparkNaiveBayes +import org.apache.mahout.classifier.naivebayes.{SparkNaiveBayes, _} import org.apache.mahout.common.Hadoop1HDFSUtil import org.apache.mahout.math.drm import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.drm.RLikeDrmOps.drm2RLikeOps + import scala.collection.immutable.HashMap @@ -48,33 +47,33 @@ object TrainNBDriver extends MahoutSparkDriver { // default trainComplementary is false opts = opts + ("trainComplementary" -> false) - opt[Unit]("trainComplementary") abbr ("c") action { (_, options) => + opt[Unit]("trainComplementary") abbr "c" action { (_, options) => options + ("trainComplementary" -> true) - } text ("Train a complementary model, Default: false.") + } text "Train a complementary model, Default: false." // Laplace smoothing paramater default is 1.0 opts = opts + ("alphaI" -> 1.0) - opt[Double]("alphaI") abbr ("a") action { (x, options) => + opt[Double]("alphaI") abbr "a" action { (x, options) => options + ("alphaI" -> x) - } text ("Laplace soothing factor default is 1.0") validate { x => + } text "Laplace smothing factor default is 1.0" validate { x => if (x > 0) success else failure("Option --alphaI must be > 0") } // Overwrite the output directory (with the model) if it exists? Default: false opts = opts + ("overwrite" -> false) - opt[Unit]("overwrite") abbr ("ow") action { (_, options) => + opt[Unit]("overwrite") abbr "ow" action { (_, options) => options + ("overwrite" -> true) - } text ("Overwrite the output directory (with the model) if it exists? Default: false") + } text "Overwrite the output directory (with the model) if it exists? Default: false" // Spark config options--not driver specific parseSparkOptions() - help("help") abbr ("h") text ("prints this usage text\n") + help("help") abbr "h" text "prints this usage text\n" } parser.parse(args, parser.opts) map { opts => parser.opts = opts - process() + process } } http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala index 4d13a5a..96ba8cd 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala @@ -17,7 +17,7 @@ package org.apache.mahout.sparkbindings -import org.apache.mahout.math.drm.{DistributedEngine, BCast, DistributedContext} +import org.apache.mahout.math.drm.{DistributedEngine, DistributedContext} import org.apache.spark.SparkContext class SparkDistributedContext(val sc: SparkContext) extends DistributedContext { http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala index ffb164c..676b496 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala @@ -85,7 +85,7 @@ object ABt { s"A=${operator.A.nrow}x${operator.A.ncol}, B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol." ) - // blockwise multimplication function + // blockwise multiplication function def mmulFunc(tupleA: BlockifiedDrmTuple[K], tupleB: BlockifiedDrmTuple[Int]): (Array[K], Array[Int], Matrix) = { val (keysA, blockA) = tupleA val (keysB, blockB) = tupleB http://git-wip-us.apache.org/repos/asf/mahout/blob/072289a4/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala index a9dc874..4c75e75 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/DistributedSparkSuite.scala @@ -40,8 +40,8 @@ trait DistributedSparkSuite extends DistributedMahoutSuite with LoggerConfigurat // Do not run MAHOUT_HOME jars in unit tests. addMahoutJars = !isLocal, sparkConf = new SparkConf() - .set("spark.kryoserializer.buffer.mb", "40m") - .set("spark.kryoserializer.buffer", "40m") + .set("spark.kryoserializer.buffer.mb", "40") + .set("spark.kryoserializer.buffer", "40") .set("spark.akka.frameSize", "30") .set("spark.default.parallelism", "10") .set("spark.executor.memory", "2G")
