Repository: mahout Updated Branches: refs/heads/master 85e543c9a -> a4bba8261
MAHOUT-1802: Capture attached checkpoints (if cached) closes apache/mahout#185 Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a4bba826 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a4bba826 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a4bba826 Branch: refs/heads/master Commit: a4bba8261f848ea7833bd5723a515ee3bd10989c Parents: 85e543c Author: Andrew Palumbo <[email protected]> Authored: Tue Mar 8 22:55:36 2016 -0500 Committer: Andrew Palumbo <[email protected]> Committed: Tue Mar 8 22:55:36 2016 -0500 ---------------------------------------------------------------------- .../apache/mahout/h2obindings/H2OEngine.scala | 13 +++---- .../h2obindings/drm/CheckpointedDrmH2O.scala | 4 ++- .../mahout/math/drm/CheckpointedDrm.scala | 4 +++ .../mahout/math/drm/DistributedEngine.scala | 3 ++ .../mahout/sparkbindings/SparkEngine.scala | 12 ++++--- .../drm/CheckpointedDrmSpark.scala | 38 +++++++++++--------- .../apache/mahout/sparkbindings/package.scala | 4 +-- 7 files changed, 48 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index 5567f84..60bf7ac 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -67,26 +67,27 @@ object H2OEngine extends DistributedEngine { def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] = { val drmMetadata = hdfsUtils.readDrmHeader(path) - new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]]) + new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc, CacheHint.NONE)(drmMetadata.keyClassTag. + asInstanceOf[ClassTag[Any]]) } /** This creates an empty DRM with specified number of partitions and cardinality. */ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = - new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc) + new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc, CacheHint.NONE) def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Long] = - new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc) + new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, -1), dc, CacheHint.NONE) /** Parallelize in-core matrix as H2O distributed matrix, using row ordinal indices as data set keys. */ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[Int] = - new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc) + new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc, CacheHint.NONE) /** Parallelize in-core matrix as H2O distributed matrix, using row labels as a data set keys. */ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int)(implicit dc: DistributedContext): CheckpointedDrm[String] = - new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc) + new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, -1), dc, CacheHint.NONE) def toPhysical[K:ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = - new CheckpointedDrmH2O[K](tr2phys(plan), plan.context) + new CheckpointedDrmH2O[K](tr2phys(plan), plan.context, ch) /** Eagerly evaluate operator graph into an H2O DRM */ private def tr2phys[K: ClassTag](oper: DrmLike[K]): H2ODrm = { http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala index 371e8b4..043e75c 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala @@ -2,6 +2,7 @@ package org.apache.mahout.h2obindings.drm import org.apache.mahout.h2obindings._ import org.apache.mahout.math.Matrix +import org.apache.mahout.math.drm.CacheHint.CacheHint import org.apache.mahout.math.drm._ import scala.reflect._ @@ -15,7 +16,8 @@ import scala.reflect._ */ class CheckpointedDrmH2O[K: ClassTag]( val h2odrm: H2ODrm, - val context: DistributedContext + val context: DistributedContext, + override val cacheHint: CacheHint ) extends CheckpointedDrm[K] { /** http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 78b7ce8..9a08740 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -18,11 +18,13 @@ package org.apache.mahout.math.drm import org.apache.mahout.math.Matrix +import org.apache.mahout.math.drm.CacheHint.CacheHint import scala.reflect.ClassTag /** * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be * therefore collected or saved. + * * @tparam K matrix key type (e.g. the keys of sequence files once persisted) */ trait CheckpointedDrm[K] extends DrmLike[K] { @@ -31,6 +33,8 @@ trait CheckpointedDrm[K] extends DrmLike[K] { def dfsWrite(path: String) + val cacheHint: CacheHint + /** If this checkpoint is already declared cached, uncache. */ def uncache(): this.type http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala index f4d209e..c27e8dd 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala @@ -141,6 +141,9 @@ object DistributedEngine { action match { + // Logical but previously had checkpoint attached to it already that has some caching policy to it + case cpa: CheckpointAction[K] if cpa.cp.exists(_.cacheHint != CacheHint.NONE) â cpa.cp.get + // self element-wise rewrite case OpAewB(a, b, op) if a == b => { op match { http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 99412df..5298343 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -126,7 +126,7 @@ object SparkEngine extends DistributedEngine { rddInput = rddInput, _nrow = plan.nrow, _ncol = plan.ncol, - _cacheStorageLevel = cacheHint2Spark(ch), + cacheHint = ch, partitioningTag = plan.partitioningTag ) newcp.cache() @@ -172,7 +172,8 @@ object SparkEngine extends DistributedEngine { def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) (implicit sc: DistributedContext) : CheckpointedDrm[Int] = { - new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol) + new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol, + cacheHint = CacheHint.NONE) } private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1) @@ -191,7 +192,8 @@ object SparkEngine extends DistributedEngine { val rb = m.getRowLabelBindings val p = for (i: String â rb.keySet().toIndexedSeq) yield i â m(rb(i), ::) - new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol) + new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol, + cacheHint = CacheHint.NONE) } /** This creates an empty DRM with specified number of partitions and cardinality. */ @@ -204,7 +206,7 @@ object SparkEngine extends DistributedEngine { for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) }) - new CheckpointedDrmSpark[Int](rdd, nrow, ncol) + new CheckpointedDrmSpark[Int](rdd, nrow, ncol, cacheHint = CacheHint.NONE) } def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) @@ -216,7 +218,7 @@ object SparkEngine extends DistributedEngine { for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) }) - new CheckpointedDrmSpark[Long](rdd, nrow, ncol) + new CheckpointedDrmSpark[Long](rdd, nrow, ncol, cacheHint = CacheHint.NONE) } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 2f5d600..e369cf7 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -18,6 +18,7 @@ package org.apache.mahout.sparkbindings.drm import org.apache.mahout.math._ +import org.apache.mahout.math.drm.CacheHint.CacheHint import math._ import scalabindings._ import RLikeOps._ @@ -32,24 +33,26 @@ import org.apache.mahout.sparkbindings._ /** ==Spark-specific optimizer-checkpointed DRM.== * - * @param rddInput underlying rdd to wrap over. - * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal. - * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive traversal. - * @param _cacheStorageLevel storage level - * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. + * @param rddInput underlying rdd to wrap over. + * @param _nrow number of rows; if unspecified, we will compute with an inexpensive traversal. + * @param _ncol number of columns; if unspecified, we will try to guess with an inexpensive traversal. + * @param cacheHint cache level to use. (Implementors usually want to override the default!) + * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows * (will require a lazy fix for some physical operations. - * @param evidence$1 class tag context bound for K. + * @param evidence$1 class tag context bound for K. * @tparam K matrix key type (e.g. the keys of sequence files once persisted) */ class CheckpointedDrmSpark[K: ClassTag]( - private[sparkbindings] val rddInput: DrmRddInput[K], - private[sparkbindings] var _nrow: Long = -1L, - private[sparkbindings] var _ncol: Int = -1, - private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, - override protected[mahout] val partitioningTag: Long = Random.nextLong(), - private var _canHaveMissingRows: Boolean = false - ) extends CheckpointedDrm[K] { + private[sparkbindings] val rddInput: DrmRddInput[K], + private[sparkbindings] var _nrow: Long = -1L, + private[sparkbindings] var _ncol: Int = -1, + override val cacheHint: CacheHint = CacheHint.NONE, + override protected[mahout] val partitioningTag: Long = Random.nextLong(), + private var _canHaveMissingRows: Boolean = false + ) extends CheckpointedDrm[K] { + + private val _cacheStorageLevel: StorageLevel = SparkEngine.cacheHint2Spark(cacheHint) lazy val nrow = if (_nrow >= 0) _nrow else computeNRow lazy val ncol = if (_ncol >= 0) _ncol else computeNCol @@ -110,7 +113,8 @@ class CheckpointedDrmSpark[K: ClassTag]( * * Note that this pre-allocates target matrix and then assigns collected RDD to it * thus this likely would require about 2 times the RDD memory - * @return + * + * @return */ def collect: Matrix = { @@ -152,7 +156,8 @@ class CheckpointedDrmSpark[K: ClassTag]( /** * Dump matrix as computed Mahout's DRM into specified (HD)FS path - * @param path + * + * @param path */ def dfsWrite(path: String) = { val ktag = implicitly[ClassTag[K]] @@ -207,13 +212,14 @@ class CheckpointedDrmSpark[K: ClassTag]( /** Changes the number of rows in the DRM without actually touching the underlying data. Used to * redimension a DRM after it has been created, which implies some blank, non-existent rows. + * * @param n new row dimension * @return */ override def newRowCardinality(n: Int): CheckpointedDrm[K] = { assert(n > -1) assert( n >= nrow) - new CheckpointedDrmSpark(rddInput = rddInput, _nrow = n, _ncol = _ncol, _cacheStorageLevel = _cacheStorageLevel, + new CheckpointedDrmSpark(rddInput = rddInput, _nrow = n, _ncol = _ncol, cacheHint = cacheHint, partitioningTag = partitioningTag, _canHaveMissingRows = _canHaveMissingRows) } http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index de309c3..ff2df63 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -140,8 +140,8 @@ package object sparkbindings { def drmWrap[K: ClassTag](rdd: DrmRdd[K], nrow: Long = -1, ncol: Int = -1, cacheHint: CacheHint.CacheHint = CacheHint.NONE, canHaveMissingRows: Boolean = false): CheckpointedDrm[K] = - new CheckpointedDrmSpark[K](rddInput = rdd, _nrow = nrow, _ncol = ncol, _cacheStorageLevel = SparkEngine - .cacheHint2Spark(cacheHint), _canHaveMissingRows = canHaveMissingRows) + new CheckpointedDrmSpark[K](rddInput = rdd, _nrow = nrow, _ncol = ncol, cacheHint = cacheHint, + _canHaveMissingRows = canHaveMissingRows) /** Another drmWrap version that takes in vertical block-partitioned input to form the matrix. */
