Repository: mahout Updated Branches: refs/heads/flink-binding f1018ed18 -> 46071e6f3
MAHOUT-1802: Capture attached checkpoints (if cached) closes apache/mahout#185 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/46071e6f Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/46071e6f Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/46071e6f Branch: refs/heads/flink-binding Commit: 46071e6f3eb9ed53895222a54657326c83c62a53 Parents: f1018ed Author: Andrew Palumbo <[email protected]> Authored: Tue Mar 8 22:55:36 2016 -0500 Committer: smarthi <[email protected]> Committed: Wed Mar 9 18:16:58 2016 -0500 ---------------------------------------------------------------------- .../apache/mahout/h2obindings/H2OEngine.scala | 13 ++-- .../h2obindings/drm/CheckpointedDrmH2O.scala | 4 +- .../mahout/math/drm/CheckpointedDrm.scala | 5 ++ .../mahout/math/drm/DistributedEngine.scala | 3 + .../mahout/sparkbindings/SparkEngine.scala | 12 +-- .../drm/CheckpointedDrmSpark.scala | 81 +++++++++++--------- .../apache/mahout/sparkbindings/package.scala | 4 +- 7 files changed, 71 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 5567f84..60bf7ac 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -67,26 +67,27 @@ object H2OEngine extends DistributedEngine { def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = { val drmMetadata = hdfsUtils.readDrmHeader(path) - new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc, CacheHint.NONE)(drmMetadata.keyClassTag. + asInstanceOf[ClassTag[Any]]) } /** This creates an empty DRM with specified number of partitions and cardinality. */ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = - new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc) + new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc, CacheHint.NONE) def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Long] = - new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc) + new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc, CacheHint.NONE) /** Parallelize in-core matrix as H2O distributed matrix, using row ordinal indices as data set keys. */ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = - new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc) + new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc, CacheHint.NONE) /** Parallelize in-core matrix as H2O distributed matrix, using row labels as a data set keys. */ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[String] = - new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc) + new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc, CacheHint.NONE) def toPhysical[K:ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = - new CheckpointedDrmH2O[K](tr2phys(plan), plan.context) + new CheckpointedDrmH2O[K](tr2phys(plan), plan.context, ch) /** Eagerly evaluate operator graph into an H2O DRM */ private def tr2phys[K: ClassTag](oper: DrmLike[K]): H2ODrm = { http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala index f15e2bb..faf584a 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala @@ -2,6 +2,7 @@ package org.apache.mahout.h2obindings.drm import org.apache.mahout.h2obindings._ import org.apache.mahout.math.Matrix +import org.apache.mahout.math.drm.CacheHint.CacheHint import org.apache.mahout.math.drm._ import scala.reflect._ @@ -15,7 +16,8 @@ import scala.reflect._ */ class CheckpointedDrmH2O[K: ClassTag]( val h2odrm: H2ODrm, - val context: DistributedContext + val context: DistributedContext, + override val cacheHint: CacheHint ) extends CheckpointedDrm[K] { override val keyClassTag: ClassTag[K] = classTag[K] http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 43a400d..9a08740 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -18,10 +18,13 @@ package org.apache.mahout.math.drm import org.apache.mahout.math.Matrix +import org.apache.mahout.math.drm.CacheHint.CacheHint +import scala.reflect.ClassTag /** * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be * therefore collected or saved. + * * @tparam K matrix key type (e.g. the keys of sequence files once persisted) */ trait CheckpointedDrm[K] extends DrmLike[K] { @@ -30,6 +33,8 @@ trait CheckpointedDrm[K] extends DrmLike[K] { def dfsWrite(path: String) + val cacheHint: CacheHint + /** If this checkpoint is already declared cached, uncache. */ def uncache(): this.type http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index 420dc0e..4c442d0 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -141,6 +141,9 @@ object DistributedEngine { action match { + // Logical but previously had checkpoint attached to it already that has some caching policy to it + case cpa: CheckpointAction[K] if cpa.cp.exists(_.cacheHint != CacheHint.NONE) â cpa.cp.get + // self element-wise rewrite case OpAewB(a, b, op) if (a == b) => { op match { http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 2a22d64..8050ef3 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -123,7 +123,7 @@ object SparkEngine extends DistributedEngine { rddInput = rddInput, _nrow = plan.nrow, _ncol = plan.ncol, - _cacheStorageLevel = cacheHint2Spark(ch), + cacheHint = ch, partitioningTag = plan.partitioningTag ) newcp.cache() @@ -169,7 +169,8 @@ object SparkEngine extends DistributedEngine { def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) (implicit sc: DistributedContext) : CheckpointedDrm[Int] = { - new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol) + new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol, + cacheHint = CacheHint.NONE) } private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1) @@ -188,7 +189,8 @@ object SparkEngine extends DistributedEngine { val rb = m.getRowLabelBindings val p = for (i: String â rb.keySet().toIndexedSeq) yield i â m(rb(i), ::) - new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol) + new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol, + cacheHint = CacheHint.NONE) } /** This creates an empty DRM with specified number of partitions and cardinality. */ @@ -201,7 +203,7 @@ object SparkEngine extends DistributedEngine { for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) }) - new CheckpointedDrmSpark[Int](rdd, nrow, ncol) + new CheckpointedDrmSpark[Int](rdd, nrow, ncol, cacheHint = CacheHint.NONE) } def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) @@ -213,7 +215,7 @@ object SparkEngine extends DistributedEngine { for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) }) - new CheckpointedDrmSpark[Long](rdd, nrow, ncol) + new CheckpointedDrmSpark[Long](rdd, nrow, ncol, cacheHint = CacheHint.NONE) } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 857cca0..e369cf7 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -17,41 +17,42 @@ package org.apache.mahout.sparkbindings.drm -import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.mahout.math._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ -import org.apache.mahout.sparkbindings._ -import org.apache.spark.storage.StorageLevel - +import org.apache.mahout.math.drm.CacheHint.CacheHint +import math._ +import scalabindings._ +import RLikeOps._ +import drm._ import scala.collection.JavaConversions._ -import scala.math._ -import scala.reflect._ +import org.apache.spark.storage.StorageLevel +import reflect._ import scala.util.Random +import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} +import org.apache.mahout.math.drm._ +import org.apache.mahout.sparkbindings._ /** ==Spark-specific optimizer-checkpointed DRM.== * - * @param rddInput underlying rdd to wrap over. - * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal. - * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive traversal. - * @param _cacheStorageLevel storage level - * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. + * @param rddInput underlying rdd to wrap over. + * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal. + * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive traversal. + * @param cacheHint cache level to use. (Implementors usually want to override the default!) + * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows * (will require a lazy fix for some physical operations. + * @param evidence$1 class tag context bound for K. * @tparam K matrix key type (e.g. the keys of sequence files once persisted) */ class CheckpointedDrmSpark[K: ClassTag]( - private[sparkbindings] val rddInput: DrmRddInput[K], - private[sparkbindings] var _nrow: Long = -1L, - private[sparkbindings] var _ncol: Int = -1, - private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, - override protected[mahout] val partitioningTag: Long = Random.nextLong(), - private var _canHaveMissingRows: Boolean = false - ) extends CheckpointedDrm[K] { - + private[sparkbindings] val rddInput: DrmRddInput[K], + private[sparkbindings] var _nrow: Long = -1L, + private[sparkbindings] var _ncol: Int = -1, + override val cacheHint: CacheHint = CacheHint.NONE, + override protected[mahout] val partitioningTag: Long = Random.nextLong(), + private var _canHaveMissingRows: Boolean = false + ) extends CheckpointedDrm[K] { - override val keyClassTag: ClassTag[K] = classTag[K] + private val _cacheStorageLevel: StorageLevel = SparkEngine.cacheHint2Spark(cacheHint) lazy val nrow = if (_nrow >= 0) _nrow else computeNRow lazy val ncol = if (_ncol >= 0) _ncol else computeNCol @@ -66,6 +67,9 @@ class CheckpointedDrmSpark[K: ClassTag]( private var cached: Boolean = false override val context: DistributedContext = rddInput.backingRdd.context + /** Explicit extraction of key class Tag */ + def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]] + /** * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer * and writing down Spark graph lineage since last checkpointed DRM. @@ -109,7 +113,8 @@ class CheckpointedDrmSpark[K: ClassTag]( * * Note that this pre-allocates target matrix and then assigns collected RDD to it * thus this likely would require about 2 times the RDD memory - * @return + * + * @return */ def collect: Matrix = { @@ -122,7 +127,7 @@ class CheckpointedDrmSpark[K: ClassTag]( // since currently spark #collect() requires Serializeable support, // we serialize DRM vectors into byte arrays on backend and restore Vector // instances on the front end: - val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect() + val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect() val m = if (data.forall(_._2.isDense)) @@ -151,20 +156,21 @@ class CheckpointedDrmSpark[K: ClassTag]( /** * Dump matrix as computed Mahout's DRM into specified (HD)FS path - * @param path + * + * @param path */ def dfsWrite(path: String) = { val ktag = implicitly[ClassTag[K]] // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save. if (ktag.runtimeClass == classOf[Int]) { - rddInput.asRowWise() + rddInput.toDrmRdd() .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[String]){ - rddInput.asRowWise() + rddInput.toDrmRdd() .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[Long]) { - rddInput.asRowWise() + rddInput.toDrmRdd() .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) @@ -175,44 +181,45 @@ class CheckpointedDrmSpark[K: ClassTag]( val intRowIndex = classTag[K] == classTag[Int] if (intRowIndex) { - val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]] + val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]] // I guess it is a suitable place to compute int keys consistency test here because we know // that nrow can be computed lazily, which always happens when rdd is already available, cached, // and it's ok to compute small summaries without triggering huge pipelines. Which usually // happens right after things like drmFromHDFS or drmWrap(). - val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max) + 1L + val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L val rowCount = rdd.count() _canHaveMissingRows = maxPlus1 != rowCount || rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong intFixExtra = (maxPlus1 - rowCount) max 0L maxPlus1 } else - cache().rddInput.asRowWise().count() + cache().rddInput.toDrmRdd().count() } protected def computeNCol = { rddInput.isBlockified match { - case true â rddInput.asBlockified(throw new AssertionError("not reached")) - .map(_._2.ncol).reduce(max) - case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max) + case true â rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached")) + .map(_._2.ncol).reduce(max(_, _)) + case false â cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max(_, _)) } } protected def computeNNonZero = - cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong + cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong /** Changes the number of rows in the DRM without actually touching the underlying data. Used to * redimension a DRM after it has been created, which implies some blank, non-existent rows. + * * @param n new row dimension * @return */ override def newRowCardinality(n: Int): CheckpointedDrm[K] = { assert(n > -1) assert( n >= nrow) - new CheckpointedDrmSpark(rddInput = rddInput, _nrow = n, _ncol = _ncol, _cacheStorageLevel = _cacheStorageLevel, + new CheckpointedDrmSpark(rddInput = rddInput, _nrow = n, _ncol = _ncol, cacheHint = cacheHint, partitioningTag = partitioningTag, _canHaveMissingRows = _canHaveMissingRows) } http://git-wip-us.apache.org/repos/asf/mahout/blob/46071e6f/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 2b55de6..5330581 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -140,8 +140,8 @@ package object sparkbindings { def drmWrap[K: ClassTag](rdd: DrmRdd[K], nrow: Long = -1, ncol: Int = -1, cacheHint: CacheHint.CacheHint = CacheHint.NONE, canHaveMissingRows: Boolean = false): CheckpointedDrm[K] = - new CheckpointedDrmSpark[K](rddInput = rdd, _nrow = nrow, _ncol = ncol, _cacheStorageLevel = SparkEngine - .cacheHint2Spark(cacheHint), _canHaveMissingRows = canHaveMissingRows) + new CheckpointedDrmSpark[K](rddInput = rdd, _nrow = nrow, _ncol = ncol, cacheHint = cacheHint, + _canHaveMissingRows = canHaveMissingRows) /** Another drmWrap version that takes in vertical block-partitioned input to form the matrix. */
