Repository: mahout
Updated Branches:
  refs/heads/master 45d88031c -> 3dd18344a


MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; 
elements of automatic parallelism management

This closes apache/mahout#13.

Squashed commit of the following:

commit de03a6aa8361424ee8fb776f995fbe1b811e0ccd
Author: Dmitriy Lyubimov <[email protected]>
Date:   Wed Jun 18 14:22:46 2014 -0700

    doc

commit f399f7a4dc61905ef05d9944dbd2e5a4c31a654b
Merge: b02cf18 45d8803
Author: Dmitriy Lyubimov <[email protected]>
Date:   Wed Jun 18 14:18:35 2014 -0700

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/mahout 
into MAHOUT-1573

commit b02cf18dac608e2f969a845efd7ee35a3a5bd0e0
Author: Dmitriy Lyubimov <[email protected]>
Date:   Wed Jun 18 14:18:03 2014 -0700

    switching to par(...) api

commit 06bb4bcdedb3c5aac50a44ddec3957f1ed12d808
Author: Dmitriy Lyubimov <[email protected]>
Date:   Fri Jun 6 23:05:12 2014 -0700

    pom fixes to fix tests (spark hadoop dependencies must precede that of 
mahout-math's for tests to run correctly)

commit 1e6e3f87ffab5897569fea94c643da2bdbb59e33
Author: Dmitriy Lyubimov <[email protected]>
Date:   Fri Jun 6 22:15:33 2014 -0700

    remove any special handling of Par in rewrites

commit 2f3b4f5bac901ec152453473d4982cb9b6e5d651
Author: Dmitriy Lyubimov <[email protected]>
Date:   Fri Jun 6 16:33:27 2014 -0700

    + auto_|| operator

commit 2733002f4b5db3d5114a440b03967d954a3738e9
Author: Dmitriy Lyubimov <[email protected]>
Date:   Fri Jun 6 15:56:26 2014 -0700

    explicit parallelism adjustment levers exact_|| and min_||

commit cf7f18b4af4ad043d2bdcefeeda15031fa018543
Author: Dmitriy Lyubimov <[email protected]>
Date:   Fri Jun 6 13:21:19 2014 -0700

    docs

commit 2f785109b9a52e748626ba46f5bc0a35ffc98e2c
Author: Dmitriy Lyubimov <[email protected]>
Date:   Fri Jun 6 13:02:11 2014 -0700

    Refactoring drmFromHDFS()


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3dd18344
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3dd18344
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3dd18344

Branch: refs/heads/master
Commit: 3dd18344a47fb86b5127bcf3e051a2eb4e7ca996
Parents: 45d8803
Author: Dmitriy Lyubimov <[email protected]>
Authored: Wed Jun 18 14:33:07 2014 -0700
Committer: Dmitriy Lyubimov <[email protected]>
Committed: Wed Jun 18 14:33:07 2014 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |  4 +-
 .../mahout/math/drm/DistributedEngine.scala     | 12 +++-
 .../org/apache/mahout/math/drm/DrmLikeOps.scala | 33 ++++++++++-
 .../math/drm/logical/AbstractBinaryOp.scala     | 15 +++++
 .../apache/mahout/math/drm/logical/OpPar.scala  | 18 ++++++
 spark/pom.xml                                   | 13 ++--
 .../mahout/sparkbindings/SparkEngine.scala      | 62 +++++++++++---------
 .../apache/mahout/sparkbindings/blas/Par.scala  | 50 ++++++++++++++++
 .../mahout/sparkbindings/drm/DrmRddInput.scala  |  4 ++
 .../sparkbindings/drm/DrmLikeOpsSuite.scala     | 24 ++++++++
 .../sparkbindings/test/MahoutLocalContext.scala |  1 +
 11 files changed, 198 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 47aacf6..c9b6a0d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,7 +2,9 @@ Mahout Change Log
 
 Release 1.0 - unreleased
 
-  MAHOUT-1580 Optimize getNumNonZeroElements() (ssc)
+  MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; 
elements of automatic parallelism management (dlyubimov)
+
+  MAHOUT-1580: Optimize getNumNonZeroElements() (ssc)
   
   MAHOUT-1464: Cooccurrence Analysis on Spark (pat)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index f136981..03471fd 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -24,6 +24,7 @@ import scalabindings._
 import RLikeOps._
 import DistributedEngine._
 import org.apache.mahout.math.scalabindings._
+import org.apache.log4j.Logger
 
 /** Abstraction of optimizer/distributed engine */
 trait DistributedEngine {
@@ -60,8 +61,13 @@ trait DistributedEngine {
   /** Broadcast support */
   def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
 
-  /** Load DRM from hdfs (as in Mahout DRM format) */
-  def drmFromHDFS (path: String)(implicit sc: DistributedContext): 
CheckpointedDrm[_]
+  /**
+   * Load DRM from hdfs (as in Mahout DRM format).
+   * <P/>
+   * @param path The DFS path to load from
+   * @param parMin Minimum parallelism after load (equivalent to 
#par(min=...)).
+   */
+  def drmFromHDFS(path: String, parMin: Int = 0)(implicit sc: 
DistributedContext): CheckpointedDrm[_]
 
   /** Parallelize in-core matrix as spark distributed matrix, using row 
ordinal indices as data set keys. */
   def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
@@ -82,6 +88,8 @@ trait DistributedEngine {
 
 object DistributedEngine {
 
+  private val log = Logger.getLogger(DistributedEngine.getClass)
+
   /** This is mostly multiplication operations rewrites */
   private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
index 328805a..bc937d6 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
@@ -19,10 +19,39 @@ package org.apache.mahout.math.drm
 
 import scala.reflect.ClassTag
 import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.drm.logical.{OpMapBlock, OpRowRange}
+import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange}
 
 /** Common Drm ops */
-class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
+class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) {
+
+  /**
+   * Parallelism adjustments. <P/>
+   *
+   * Change only one of parameters from default value to choose new 
parallelism adjustment strategy.
+   * <P/>
+   *
+   * E.g. use
+   * <pre>
+   *   drmA.par(auto = true)
+   * </pre>
+   * to use automatic parallelism adjustment.
+   * <P/>
+   *
+   * Parallelism here in API is fairly abstract concept, and actual value 
interpretation is left for
+   * a particular backend strategy. However, it is usually equivalent to 
number of map tasks or data
+   * splits.
+   * <P/>
+   *
+   * @param min If changed from default, ensures the product has at least that 
much parallelism.
+   * @param exact if changed from default, ensures the pipeline product has 
exactly that much
+   *              parallelism.
+   * @param auto If changed from default, engine-specific automatic 
parallelism adjustment strategy
+   *             is applied.
+   */
+  def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = {
+    assert(min >= 0 || exact >= 0 || auto, "Invalid argument")
+    OpPar(drm, minSplits = min, exactSplits = exact)
+  }
 
   /**
    * Map matrix block-wise vertically. Blocks of the new matrix can be 
modified original block

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
index c2371d1..efd60ab 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -20,6 +20,21 @@ package org.apache.mahout.math.drm.logical
 import scala.reflect.ClassTag
 import org.apache.mahout.math.drm.{DistributedContext, DrmLike}
 
+/**
+ * Any logical binary operator (such as A + B).
+ * <P/>
+ *
+ * Any logical operator derived from this is also capabile of triggering 
optimizer checkpoint, hence,
+ * it also inherits CheckpointAction.
+ * <P/>
+ * 
+ * @param evidence$1 LHS key type tag
+ * @param evidence$2 RHS key type tag
+ * @param evidence$3 expression key type tag
+ * @tparam A LHS key type
+ * @tparam B RHS key type
+ * @tparam K result key type
+ */
 abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag]
     extends CheckpointAction[K] with DrmLike[K] {
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
new file mode 100644
index 0000000..f438728
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala
@@ -0,0 +1,18 @@
+package org.apache.mahout.math.drm.logical
+
+import org.apache.mahout.math.drm.DrmLike
+import scala.reflect.ClassTag
+
+/** Parallelism operator */
+case class OpPar[K: ClassTag](
+    override var A: DrmLike[K],
+    val minSplits: Int = -1,
+    val exactSplits: Int = -1)
+    extends AbstractUnaryOp[K, K] {
+
+  /** R-like syntax for number of rows. */
+  def nrow: Long = A.nrow
+
+  /** R-like syntax for number of columns */
+  def ncol: Int = A.ncol
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index ac99ffd..5dc566f 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -263,6 +263,13 @@
 
   <dependencies>
 
+    <!-- spark stuff - need to put this first to use spark's mahout 
dependencies in tests -->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.major}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.mahout</groupId>
       <artifactId>mahout-math-scala</artifactId>
@@ -288,12 +295,6 @@
 
 
     <!--  3rd-party -->
-    <!-- spark stuff -->
-    <dependency>
-      <groupId>org.apache.spark</groupId>
-      <artifactId>spark-core_${scala.major}</artifactId>
-      <version>${spark.version}</version>
-    </dependency>
 
     <!-- scala stuff -->
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index a4eef9d..996eb1b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConversions._
 import org.apache.spark.SparkContext
 import org.apache.mahout.math.drm._
 import org.apache.mahout.math.drm.RLikeDrmOps._
+import org.apache.spark.rdd.RDD
 
 /** Spark-specific non-drm-method operations */
 object SparkEngine extends DistributedEngine {
@@ -124,40 +125,46 @@ object SparkEngine extends DistributedEngine {
    *
    * @return DRM[Any] where Any is automatically translated to value type
    */
-  def drmFromHDFS (path: String)(implicit sc: DistributedContext): 
CheckpointedDrm[_] = {
-    implicit val scc:SparkContext = sc
-    val rdd = sc.sequenceFile(path, classOf[Writable], 
classOf[VectorWritable]).map(t => (t._1, t._2.get()))
-
-    val key = rdd.map(_._1).take(1)(0)
-    val keyWClass = key.getClass.asSubclass(classOf[Writable])
-
-    val key2val = key match {
-      case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
-      case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
-      case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
-      case xx: Writable => (v: AnyRef) => v
-    }
+  def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: 
DistributedContext): CheckpointedDrm[_] = {
 
-    val val2key = key match {
-      case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
-      case xx: Text => (x: Any) => new Text(x.toString)
-      case xx: LongWritable => (x: Any) => new 
LongWritable(x.asInstanceOf[Int])
-      case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
-    }
+    val rdd = sc.sequenceFile(path, classOf[Writable], 
classOf[VectorWritable], minSplits = parMin)
+        // Get rid of VectorWritable
+        .map(t => (t._1, t._2.get()))
+
+    def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = 
implicitly[ClassTag[K]]
+
+    // Spark should've loaded the type info from the header, right?
+    val keyTag = getKeyClassTag(rdd)
+
+    val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match {
+
+      case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) 
=> (
+          (v: AnyRef) => v.asInstanceOf[IntWritable].get,
+          (x: Any) => new IntWritable(x.asInstanceOf[Int]),
+          implicitly[ClassTag[Int]])
+
+      case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => (
+          (v: AnyRef) => v.asInstanceOf[Text].toString,
+          (x: Any) => new Text(x.toString),
+          implicitly[ClassTag[String]])
+
+      case xx: ClassTag[Writable] if (xx == 
implicitly[ClassTag[LongWritable]]) => (
+          (v: AnyRef) => v.asInstanceOf[LongWritable].get,
+          (x: Any) => new LongWritable(x.asInstanceOf[Int]),
+          implicitly[ClassTag[Long]])
 
-    val  km = key match {
-      case xx: IntWritable => implicitly[ClassTag[Int]]
-      case xx: Text => implicitly[ClassTag[String]]
-      case xx: LongWritable => implicitly[ClassTag[Long]]
-      case xx: Writable => ClassTag(classOf[Writable])
+      case xx: ClassTag[Writable] => (
+          (v: AnyRef) => v,
+          (x: Any) => x.asInstanceOf[Writable],
+          ClassTag(classOf[Writable]))
     }
 
     {
-      implicit def getWritable(x: Any): Writable = val2key()
+      implicit def getWritable(x: Any): Writable = val2keyFunc()
       new CheckpointedDrmSpark(
-        rdd = rdd.map(t => (key2val(t._1), t._2)),
+        rdd = rdd.map(t => (key2valFunc(t._1), t._2)),
         _cacheStorageLevel = StorageLevel.MEMORY_ONLY
-      )(km.asInstanceOf[ClassTag[Any]])
+      )(unwrappedKeyTag.asInstanceOf[ClassTag[Any]])
     }
   }
 
@@ -254,6 +261,7 @@ object SparkEngine extends DistributedEngine {
         ncol = blockOp.ncol,
         bmf = blockOp.bmf
       )
+      case op@OpPar(a,_,_) => Par.exec(op,tr2phys(a)(op.classTagA))
       case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = 
Some((cp.ncol, cp.rdd)))
       case _ => throw new IllegalArgumentException("Internal:Optimizer has no 
exec policy for operator %s."
           .format(oper))

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
new file mode 100644
index 0000000..e73376d
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -0,0 +1,50 @@
+package org.apache.mahout.sparkbindings.blas
+
+import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpPar
+import org.apache.spark.rdd.RDD
+
+/** Physical adjustment of parallelism */
+object Par {
+
+  def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
+
+    def adjust[T](rdd: RDD[T]): RDD[T] =
+      if (op.minSplits > 0) {
+        if (rdd.partitions.size < op.minSplits)
+          rdd.coalesce(op.minSplits, shuffle = true)
+        else rdd.coalesce(rdd.partitions.size)
+      } else if (op.exactSplits > 0) {
+        if (op.exactSplits < rdd.partitions.size)
+          rdd.coalesce(numPartitions = op.exactSplits, shuffle = false)
+        else if (op.exactSplits > rdd.partitions.size)
+          rdd.coalesce(numPartitions = op.exactSplits, shuffle = true)
+        else
+          rdd.coalesce(rdd.partitions.size)
+      } else if (op.exactSplits == -1 && op.minSplits == -1) {
+
+        // auto adjustment, try to scale up to either x1Size or x2Size.
+        val clusterSize = rdd.context.getConf.get("spark.default.parallelism", 
"1").toInt
+
+        val x1Size = (clusterSize * .95).ceil.toInt
+        val x2Size = (clusterSize * 1.9).ceil.toInt
+
+        if (rdd.partitions.size <= x1Size)
+          rdd.coalesce(numPartitions = x1Size, shuffle = true)
+        else if (rdd.partitions.size <= x2Size)
+          rdd.coalesce(numPartitions = x2Size, shuffle = true)
+        else
+          rdd.coalesce(numPartitions = rdd.partitions.size)
+      } else rdd.coalesce(rdd.partitions.size)
+
+    if (src.isBlockified) {
+      val rdd = src.toBlockifiedDrmRdd()
+      new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd)))
+    } else {
+      val rdd = src.toDrmRdd()
+      new DrmRddInput[K](rowWiseSrc = Some(op.ncol -> adjust(rdd)))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index 3801c77..b72818c 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -32,6 +32,10 @@ class DrmRddInput[K: ClassTag](
 
   private lazy val backingRdd = 
rowWiseSrc.map(_._2).getOrElse(blockifiedSrc.get)
 
+  def isBlockified:Boolean = blockifiedSrc.isDefined
+
+  def isRowWise:Boolean = rowWiseSrc.isDefined
+
   def toDrmRdd(): DrmRdd[K] = rowWiseSrc.map(_._2).getOrElse(deblockify(rdd = 
blockifiedSrc.get))
 
   def toBlockifiedDrmRdd() = blockifiedSrc.getOrElse(blockify(rdd = 
rowWiseSrc.get._2, blockncol = rowWiseSrc.get._1))

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
index 6c71e11..81ffccf 100644
--- 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala
@@ -23,6 +23,7 @@ import scalabindings._
 import drm._
 import RLikeOps._
 import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
 import org.scalatest.FunSuite
 import org.apache.mahout.sparkbindings.test.MahoutLocalContext
 
@@ -91,4 +92,27 @@ class DrmLikeOpsSuite extends FunSuite with 
MahoutLocalContext {
 
   }
 
+  test("exact, min and auto ||") {
+    val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6))
+    val A = drmParallelize(m = inCoreA, numPartitions = 2)
+
+    A.rdd.partitions.size should equal(2)
+
+    (A + 1.0).par(exact = 4).rdd.partitions.size should equal(4)
+    A.par(exact = 2).rdd.partitions.size should equal(2)
+    A.par(exact = 1).rdd.partitions.size should equal(1)
+    A.par(exact = 0).rdd.partitions.size should equal(2) // No effect for par 
<= 0
+    A.par(min = 4).rdd.partitions.size should equal(4)
+    A.par(min = 2).rdd.partitions.size should equal(2)
+    A.par(min = 1).rdd.partitions.size should equal(2)
+    A.par(auto = true).rdd.partitions.size should equal(10)
+    A.par(exact = 10).par(auto = true).rdd.partitions.size should equal(10)
+    A.par(exact = 11).par(auto = true).rdd.partitions.size should equal(19)
+    A.par(exact = 20).par(auto = true).rdd.partitions.size should equal(20)
+
+    intercept[AssertionError] {
+      A.par()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
index d9e89bc..c48cfc7 100644
--- 
a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
+++ 
b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala
@@ -21,6 +21,7 @@ trait MahoutLocalContext extends MahoutSuite with 
LoggerConfiguration {
       sparkConf = new SparkConf()
           .set("spark.kryoserializer.buffer.mb", "15")
           .set("spark.akka.frameSize", "30")
+          .set("spark.default.parallelism", "10")
     )
   }
 

Reply via email to