Repository: mahout Updated Branches: refs/heads/master 45d88031c -> 3dd18344a
MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management This closes apache/mahout#13. Squashed commit of the following: commit de03a6aa8361424ee8fb776f995fbe1b811e0ccd Author: Dmitriy Lyubimov <[email protected]> Date: Wed Jun 18 14:22:46 2014 -0700 doc commit f399f7a4dc61905ef05d9944dbd2e5a4c31a654b Merge: b02cf18 45d8803 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Jun 18 14:18:35 2014 -0700 Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/mahout into MAHOUT-1573 commit b02cf18dac608e2f969a845efd7ee35a3a5bd0e0 Author: Dmitriy Lyubimov <[email protected]> Date: Wed Jun 18 14:18:03 2014 -0700 switching to par(...) api commit 06bb4bcdedb3c5aac50a44ddec3957f1ed12d808 Author: Dmitriy Lyubimov <[email protected]> Date: Fri Jun 6 23:05:12 2014 -0700 pom fixes to fix tests (spark hadoop dependencies must precede that of mahout-math's for tests to run correctly) commit 1e6e3f87ffab5897569fea94c643da2bdbb59e33 Author: Dmitriy Lyubimov <[email protected]> Date: Fri Jun 6 22:15:33 2014 -0700 remove any special handling of Par in rewrites commit 2f3b4f5bac901ec152453473d4982cb9b6e5d651 Author: Dmitriy Lyubimov <[email protected]> Date: Fri Jun 6 16:33:27 2014 -0700 + auto_|| operator commit 2733002f4b5db3d5114a440b03967d954a3738e9 Author: Dmitriy Lyubimov <[email protected]> Date: Fri Jun 6 15:56:26 2014 -0700 explicit parallelism adjustment levers exact_|| and min_|| commit cf7f18b4af4ad043d2bdcefeeda15031fa018543 Author: Dmitriy Lyubimov <[email protected]> Date: Fri Jun 6 13:21:19 2014 -0700 docs commit 2f785109b9a52e748626ba46f5bc0a35ffc98e2c Author: Dmitriy Lyubimov <[email protected]> Date: Fri Jun 6 13:02:11 2014 -0700 Refactoring drmFromHDFS() Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/3dd18344 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/3dd18344 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/3dd18344 Branch: refs/heads/master Commit: 3dd18344a47fb86b5127bcf3e051a2eb4e7ca996 Parents: 45d8803 Author: Dmitriy Lyubimov <[email protected]> Authored: Wed Jun 18 14:33:07 2014 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Wed Jun 18 14:33:07 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 4 +- .../mahout/math/drm/DistributedEngine.scala | 12 +++- .../org/apache/mahout/math/drm/DrmLikeOps.scala | 33 ++++++++++- .../math/drm/logical/AbstractBinaryOp.scala | 15 +++++ .../apache/mahout/math/drm/logical/OpPar.scala | 18 ++++++ spark/pom.xml | 13 ++-- .../mahout/sparkbindings/SparkEngine.scala | 62 +++++++++++--------- .../apache/mahout/sparkbindings/blas/Par.scala | 50 ++++++++++++++++ .../mahout/sparkbindings/drm/DrmRddInput.scala | 4 ++ .../sparkbindings/drm/DrmLikeOpsSuite.scala | 24 ++++++++ .../sparkbindings/test/MahoutLocalContext.scala | 1 + 11 files changed, 198 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 47aacf6..c9b6a0d 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -2,7 +2,9 @@ Mahout Change Log Release 1.0 - unreleased - MAHOUT-1580 Optimize getNumNonZeroElements() (ssc) + MAHOUT-1573: More explicit parallelism adjustments in math-scala DRM apis; elements of automatic parallelism management (dlyubimov) + + MAHOUT-1580: Optimize getNumNonZeroElements() (ssc) MAHOUT-1464: Cooccurrence Analysis on Spark (pat) http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index f136981..03471fd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -24,6 +24,7 @@ import scalabindings._ import RLikeOps._ import DistributedEngine._ import org.apache.mahout.math.scalabindings._ +import org.apache.log4j.Logger /** Abstraction of optimizer/distributed engine */ trait DistributedEngine { @@ -60,8 +61,13 @@ trait DistributedEngine { /** Broadcast support */ def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] - /** Load DRM from hdfs (as in Mahout DRM format) */ - def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_] + /** + * Load DRM from hdfs (as in Mahout DRM format). + * <P/> + * @param path The DFS path to load from + * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). + */ + def drmFromHDFS(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) @@ -82,6 +88,8 @@ trait DistributedEngine { object DistributedEngine { + private val log = Logger.getLogger(DistributedEngine.getClass) + /** This is mostly multiplication operations rewrites */ private def pass1[K: ClassTag](action: DrmLike[K]): DrmLike[K] = { http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala index 328805a..bc937d6 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala @@ -19,10 +19,39 @@ package org.apache.mahout.math.drm import scala.reflect.ClassTag import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.math.drm.logical.{OpMapBlock, OpRowRange} +import org.apache.mahout.math.drm.logical.{OpPar, OpMapBlock, OpRowRange} /** Common Drm ops */ -class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) { +class DrmLikeOps[K: ClassTag](protected[drm] val drm: DrmLike[K]) { + + /** + * Parallelism adjustments. <P/> + * + * Change only one of parameters from default value to choose new parallelism adjustment strategy. + * <P/> + * + * E.g. use + * <pre> + * drmA.par(auto = true) + * </pre> + * to use automatic parallelism adjustment. + * <P/> + * + * Parallelism here in API is fairly abstract concept, and actual value interpretation is left for + * a particular backend strategy. However, it is usually equivalent to number of map tasks or data + * splits. + * <P/> + * + * @param min If changed from default, ensures the product has at least that much parallelism. + * @param exact if changed from default, ensures the pipeline product has exactly that much + * parallelism. + * @param auto If changed from default, engine-specific automatic parallelism adjustment strategy + * is applied. + */ + def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = { + assert(min >= 0 || exact >= 0 || auto, "Invalid argument") + OpPar(drm, minSplits = min, exactSplits = exact) + } /** * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala index c2371d1..efd60ab 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala @@ -20,6 +20,21 @@ package org.apache.mahout.math.drm.logical import scala.reflect.ClassTag import org.apache.mahout.math.drm.{DistributedContext, DrmLike} +/** + * Any logical binary operator (such as A + B). + * <P/> + * + * Any logical operator derived from this is also capabile of triggering optimizer checkpoint, hence, + * it also inherits CheckpointAction. + * <P/> + * + * @param evidence$1 LHS key type tag + * @param evidence$2 RHS key type tag + * @param evidence$3 expression key type tag + * @tparam A LHS key type + * @tparam B RHS key type + * @tparam K result key type + */ abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag] extends CheckpointAction[K] with DrmLike[K] { http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala new file mode 100644 index 0000000..f438728 --- /dev/null +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala @@ -0,0 +1,18 @@ +package org.apache.mahout.math.drm.logical + +import org.apache.mahout.math.drm.DrmLike +import scala.reflect.ClassTag + +/** Parallelism operator */ +case class OpPar[K: ClassTag]( + override var A: DrmLike[K], + val minSplits: Int = -1, + val exactSplits: Int = -1) + extends AbstractUnaryOp[K, K] { + + /** R-like syntax for number of rows. */ + def nrow: Long = A.nrow + + /** R-like syntax for number of columns */ + def ncol: Int = A.ncol +} http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index ac99ffd..5dc566f 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -263,6 +263,13 @@ <dependencies> + <!-- spark stuff - need to put this first to use spark's mahout dependencies in tests --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.major}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> <groupId>org.apache.mahout</groupId> <artifactId>mahout-math-scala</artifactId> @@ -288,12 +295,6 @@ <!-- 3rd-party --> - <!-- spark stuff --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.major}</artifactId> - <version>${spark.version}</version> - </dependency> <!-- scala stuff --> http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index a4eef9d..996eb1b 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -32,6 +32,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.SparkContext import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ +import org.apache.spark.rdd.RDD /** Spark-specific non-drm-method operations */ object SparkEngine extends DistributedEngine { @@ -124,40 +125,46 @@ object SparkEngine extends DistributedEngine { * * @return DRM[Any] where Any is automatically translated to value type */ - def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - implicit val scc:SparkContext = sc - val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get())) - - val key = rdd.map(_._1).take(1)(0) - val keyWClass = key.getClass.asSubclass(classOf[Writable]) - - val key2val = key match { - case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get - case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString - case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get - case xx: Writable => (v: AnyRef) => v - } + def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = { - val val2key = key match { - case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int]) - case xx: Text => (x: Any) => new Text(x.toString) - case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int]) - case xx: Writable => (x: Any) => x.asInstanceOf[Writable] - } + val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minSplits = parMin) + // Get rid of VectorWritable + .map(t => (t._1, t._2.get())) + + def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = implicitly[ClassTag[K]] + + // Spark should've loaded the type info from the header, right? + val keyTag = getKeyClassTag(rdd) + + val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match { + + case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) => ( + (v: AnyRef) => v.asInstanceOf[IntWritable].get, + (x: Any) => new IntWritable(x.asInstanceOf[Int]), + implicitly[ClassTag[Int]]) + + case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => ( + (v: AnyRef) => v.asInstanceOf[Text].toString, + (x: Any) => new Text(x.toString), + implicitly[ClassTag[String]]) + + case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[LongWritable]]) => ( + (v: AnyRef) => v.asInstanceOf[LongWritable].get, + (x: Any) => new LongWritable(x.asInstanceOf[Int]), + implicitly[ClassTag[Long]]) - val km = key match { - case xx: IntWritable => implicitly[ClassTag[Int]] - case xx: Text => implicitly[ClassTag[String]] - case xx: LongWritable => implicitly[ClassTag[Long]] - case xx: Writable => ClassTag(classOf[Writable]) + case xx: ClassTag[Writable] => ( + (v: AnyRef) => v, + (x: Any) => x.asInstanceOf[Writable], + ClassTag(classOf[Writable])) } { - implicit def getWritable(x: Any): Writable = val2key() + implicit def getWritable(x: Any): Writable = val2keyFunc() new CheckpointedDrmSpark( - rdd = rdd.map(t => (key2val(t._1), t._2)), + rdd = rdd.map(t => (key2valFunc(t._1), t._2)), _cacheStorageLevel = StorageLevel.MEMORY_ONLY - )(km.asInstanceOf[ClassTag[Any]]) + )(unwrappedKeyTag.asInstanceOf[ClassTag[Any]]) } } @@ -254,6 +261,7 @@ object SparkEngine extends DistributedEngine { ncol = blockOp.ncol, bmf = blockOp.bmf ) + case op@OpPar(a,_,_) => Par.exec(op,tr2phys(a)(op.classTagA)) case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd))) case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." .format(oper)) http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala new file mode 100644 index 0000000..e73376d --- /dev/null +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala @@ -0,0 +1,50 @@ +package org.apache.mahout.sparkbindings.blas + +import scala.reflect.ClassTag +import org.apache.mahout.sparkbindings.drm.DrmRddInput +import org.apache.mahout.math.drm.logical.OpPar +import org.apache.spark.rdd.RDD + +/** Physical adjustment of parallelism */ +object Par { + + def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = { + + def adjust[T](rdd: RDD[T]): RDD[T] = + if (op.minSplits > 0) { + if (rdd.partitions.size < op.minSplits) + rdd.coalesce(op.minSplits, shuffle = true) + else rdd.coalesce(rdd.partitions.size) + } else if (op.exactSplits > 0) { + if (op.exactSplits < rdd.partitions.size) + rdd.coalesce(numPartitions = op.exactSplits, shuffle = false) + else if (op.exactSplits > rdd.partitions.size) + rdd.coalesce(numPartitions = op.exactSplits, shuffle = true) + else + rdd.coalesce(rdd.partitions.size) + } else if (op.exactSplits == -1 && op.minSplits == -1) { + + // auto adjustment, try to scale up to either x1Size or x2Size. + val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt + + val x1Size = (clusterSize * .95).ceil.toInt + val x2Size = (clusterSize * 1.9).ceil.toInt + + if (rdd.partitions.size <= x1Size) + rdd.coalesce(numPartitions = x1Size, shuffle = true) + else if (rdd.partitions.size <= x2Size) + rdd.coalesce(numPartitions = x2Size, shuffle = true) + else + rdd.coalesce(numPartitions = rdd.partitions.size) + } else rdd.coalesce(rdd.partitions.size) + + if (src.isBlockified) { + val rdd = src.toBlockifiedDrmRdd() + new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd))) + } else { + val rdd = src.toDrmRdd() + new DrmRddInput[K](rowWiseSrc = Some(op.ncol -> adjust(rdd))) + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala index 3801c77..b72818c 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala @@ -32,6 +32,10 @@ class DrmRddInput[K: ClassTag]( private lazy val backingRdd = rowWiseSrc.map(_._2).getOrElse(blockifiedSrc.get) + def isBlockified:Boolean = blockifiedSrc.isDefined + + def isRowWise:Boolean = rowWiseSrc.isDefined + def toDrmRdd(): DrmRdd[K] = rowWiseSrc.map(_._2).getOrElse(deblockify(rdd = blockifiedSrc.get)) def toBlockifiedDrmRdd() = blockifiedSrc.getOrElse(blockify(rdd = rowWiseSrc.get._2, blockncol = rowWiseSrc.get._1)) http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala index 6c71e11..81ffccf 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala @@ -23,6 +23,7 @@ import scalabindings._ import drm._ import RLikeOps._ import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ import org.scalatest.FunSuite import org.apache.mahout.sparkbindings.test.MahoutLocalContext @@ -91,4 +92,27 @@ class DrmLikeOpsSuite extends FunSuite with MahoutLocalContext { } + test("exact, min and auto ||") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + A.rdd.partitions.size should equal(2) + + (A + 1.0).par(exact = 4).rdd.partitions.size should equal(4) + A.par(exact = 2).rdd.partitions.size should equal(2) + A.par(exact = 1).rdd.partitions.size should equal(1) + A.par(exact = 0).rdd.partitions.size should equal(2) // No effect for par <= 0 + A.par(min = 4).rdd.partitions.size should equal(4) + A.par(min = 2).rdd.partitions.size should equal(2) + A.par(min = 1).rdd.partitions.size should equal(2) + A.par(auto = true).rdd.partitions.size should equal(10) + A.par(exact = 10).par(auto = true).rdd.partitions.size should equal(10) + A.par(exact = 11).par(auto = true).rdd.partitions.size should equal(19) + A.par(exact = 20).par(auto = true).rdd.partitions.size should equal(20) + + intercept[AssertionError] { + A.par() + } + } + } http://git-wip-us.apache.org/repos/asf/mahout/blob/3dd18344/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala index d9e89bc..c48cfc7 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/test/MahoutLocalContext.scala @@ -21,6 +21,7 @@ trait MahoutLocalContext extends MahoutSuite with LoggerConfiguration { sparkConf = new SparkConf() .set("spark.kryoserializer.buffer.mb", "15") .set("spark.akka.frameSize", "30") + .set("spark.default.parallelism", "10") ) }
