Repository: mahout
Updated Branches:
  refs/heads/master 85e543c9a -> a4bba8261


MAHOUT-1802: Capture attached checkpoints (if cached) closes apache/mahout#185


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a4bba826
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a4bba826
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a4bba826

Branch: refs/heads/master
Commit: a4bba8261f848ea7833bd5723a515ee3bd10989c
Parents: 85e543c
Author: Andrew Palumbo <[email protected]>
Authored: Tue Mar 8 22:55:36 2016 -0500
Committer: Andrew Palumbo <[email protected]>
Committed: Tue Mar 8 22:55:36 2016 -0500

----------------------------------------------------------------------
 .../apache/mahout/h2obindings/H2OEngine.scala   | 13 +++----
 .../h2obindings/drm/CheckpointedDrmH2O.scala    |  4 ++-
 .../mahout/math/drm/CheckpointedDrm.scala       |  4 +++
 .../mahout/math/drm/DistributedEngine.scala     |  3 ++
 .../mahout/sparkbindings/SparkEngine.scala      | 12 ++++---
 .../drm/CheckpointedDrmSpark.scala              | 38 +++++++++++---------
 .../apache/mahout/sparkbindings/package.scala   |  4 +--
 7 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 5567f84..60bf7ac 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -67,26 +67,27 @@ object H2OEngine extends DistributedEngine {
   def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: 
DistributedContext): CheckpointedDrm[_] = {
     val drmMetadata = hdfsUtils.readDrmHeader(path)
 
-    new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), 
dc)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
+    new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc, 
CacheHint.NONE)(drmMetadata.keyClassTag.
+      asInstanceOf[ClassTag[Any]])
   }
 
   /** This creates an empty DRM with specified number of partitions and 
cardinality. */
   def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int)(implicit 
dc: DistributedContext): CheckpointedDrm[Int] =
-    new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, 
-1), dc)
+    new CheckpointedDrmH2O[Int](H2OHelper.emptyDrm(nrow, ncol, numPartitions, 
-1), dc, CacheHint.NONE)
 
   def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: 
Int)(implicit dc: DistributedContext): CheckpointedDrm[Long] =
-    new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, 
-1), dc)
+    new CheckpointedDrmH2O[Long](H2OHelper.emptyDrm(nrow, ncol, numPartitions, 
-1), dc, CacheHint.NONE)
 
   /** Parallelize in-core matrix as H2O distributed matrix, using row ordinal 
indices as data set keys. */
   def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int)(implicit dc: 
DistributedContext): CheckpointedDrm[Int] =
-    new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), 
dc)
+    new CheckpointedDrmH2O[Int](H2OHelper.drmFromMatrix(m, numPartitions, -1), 
dc, CacheHint.NONE)
 
   /** Parallelize in-core matrix as H2O distributed matrix, using row labels 
as a data set keys. */
   def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int)(implicit dc: 
DistributedContext): CheckpointedDrm[String] =
-    new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, 
-1), dc)
+    new CheckpointedDrmH2O[String](H2OHelper.drmFromMatrix(m, numPartitions, 
-1), dc, CacheHint.NONE)
 
   def toPhysical[K:ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): 
CheckpointedDrm[K] =
-    new CheckpointedDrmH2O[K](tr2phys(plan), plan.context)
+    new CheckpointedDrmH2O[K](tr2phys(plan), plan.context, ch)
 
   /** Eagerly evaluate operator graph into an H2O DRM */
   private def tr2phys[K: ClassTag](oper: DrmLike[K]): H2ODrm = {

http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git 
a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index 371e8b4..043e75c 100644
--- 
a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -2,6 +2,7 @@ package org.apache.mahout.h2obindings.drm
 
 import org.apache.mahout.h2obindings._
 import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.drm.CacheHint.CacheHint
 import org.apache.mahout.math.drm._
 
 import scala.reflect._
@@ -15,7 +16,8 @@ import scala.reflect._
   */
 class CheckpointedDrmH2O[K: ClassTag](
   val h2odrm: H2ODrm,
-  val context: DistributedContext
+  val context: DistributedContext,
+  override val cacheHint: CacheHint
 ) extends CheckpointedDrm[K] {
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 78b7ce8..9a08740 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -18,11 +18,13 @@
 package org.apache.mahout.math.drm
 
 import org.apache.mahout.math.Matrix
+import org.apache.mahout.math.drm.CacheHint.CacheHint
 import scala.reflect.ClassTag
 
 /**
  * Checkpointed DRM API. This is a matrix that has optimized RDD lineage 
behind it and can be
  * therefore collected or saved.
+ *
  * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
  */
 trait CheckpointedDrm[K] extends DrmLike[K] {
@@ -31,6 +33,8 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
 
   def dfsWrite(path: String)
 
+  val cacheHint: CacheHint
+
   /** If this checkpoint is already declared cached, uncache. */
   def uncache(): this.type
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index f4d209e..c27e8dd 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -141,6 +141,9 @@ object DistributedEngine {
 
     action match {
 
+      // Logical but previously had checkpoint attached to it already that has 
some caching policy to it
+      case cpa: CheckpointAction[K] if cpa.cp.exists(_.cacheHint != 
CacheHint.NONE) ⇒ cpa.cp.get
+
       // self element-wise rewrite
       case OpAewB(a, b, op) if a == b => {
         op match {

http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 99412df..5298343 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -126,7 +126,7 @@ object SparkEngine extends DistributedEngine {
       rddInput = rddInput,
       _nrow = plan.nrow,
       _ncol = plan.ncol,
-      _cacheStorageLevel = cacheHint2Spark(ch),
+      cacheHint = ch,
       partitioningTag = plan.partitioningTag
     )
     newcp.cache()
@@ -172,7 +172,8 @@ object SparkEngine extends DistributedEngine {
   def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
                                   (implicit sc: DistributedContext)
   : CheckpointedDrm[Int] = {
-    new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), 
_nrow = m.nrow, _ncol = m.ncol)
+    new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), 
_nrow = m.nrow, _ncol = m.ncol,
+      cacheHint = CacheHint.NONE)
   }
 
   private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 
1)
@@ -191,7 +192,8 @@ object SparkEngine extends DistributedEngine {
     val rb = m.getRowLabelBindings
     val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), 
::)
 
-    new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), 
_nrow = m.nrow, _ncol = m.ncol)
+    new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), 
_nrow = m.nrow, _ncol = m.ncol,
+      cacheHint = CacheHint.NONE)
   }
 
   /** This creates an empty DRM with specified number of partitions and 
cardinality. */
@@ -204,7 +206,7 @@ object SparkEngine extends DistributedEngine {
 
       for (i ← partStart until partEnd) yield (i, new 
RandomAccessSparseVector(ncol): Vector)
     })
-    new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
+    new CheckpointedDrmSpark[Int](rdd, nrow, ncol, cacheHint = CacheHint.NONE)
   }
 
   def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
@@ -216,7 +218,7 @@ object SparkEngine extends DistributedEngine {
 
       for (i ← partStart until partEnd) yield (i, new 
RandomAccessSparseVector(ncol): Vector)
     })
-    new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
+    new CheckpointedDrmSpark[Long](rdd, nrow, ncol, cacheHint = CacheHint.NONE)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 2f5d600..e369cf7 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -18,6 +18,7 @@
 package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math._
+import org.apache.mahout.math.drm.CacheHint.CacheHint
 import math._
 import scalabindings._
 import RLikeOps._
@@ -32,24 +33,26 @@ import org.apache.mahout.sparkbindings._
 
 /** ==Spark-specific optimizer-checkpointed DRM.==
   *
-  * @param rddInput underlying rdd to wrap over.
-  * @param _nrow number of rows; if unspecified, we will compute with an 
inexpensive traversal.
-  * @param _ncol number of columns; if unspecified, we will try to guess with 
an inexpensive traversal.
-  * @param _cacheStorageLevel storage level
-  * @param partitioningTag unique partitioning tag. Used to detect identically 
partitioned operands.
+  * @param rddInput            underlying rdd to wrap over.
+  * @param _nrow               number of rows; if unspecified, we will compute 
with an inexpensive traversal.
+  * @param _ncol               number of columns; if unspecified, we will try 
to guess with an inexpensive traversal.
+  * @param cacheHint           cache level to use. (Implementors usually want 
to override the default!)
+  * @param partitioningTag     unique partitioning tag. Used to detect 
identically partitioned operands.
   * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also 
may have missing rows
   *                            (will require a lazy fix for some physical 
operations.
-  * @param evidence$1 class tag context bound for K.
+  * @param evidence$1          class tag context bound for K.
   * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
   */
 class CheckpointedDrmSpark[K: ClassTag](
-    private[sparkbindings] val rddInput: DrmRddInput[K],
-    private[sparkbindings] var _nrow: Long = -1L,
-    private[sparkbindings] var _ncol: Int = -1,
-    private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
-    override protected[mahout] val partitioningTag: Long = Random.nextLong(),
-    private var _canHaveMissingRows: Boolean = false
-    ) extends CheckpointedDrm[K] {
+                                         private[sparkbindings] val rddInput: 
DrmRddInput[K],
+                                         private[sparkbindings] var _nrow: 
Long = -1L,
+                                         private[sparkbindings] var _ncol: Int 
= -1,
+                                         override val cacheHint: CacheHint = 
CacheHint.NONE,
+                                         override protected[mahout] val 
partitioningTag: Long = Random.nextLong(),
+                                         private var _canHaveMissingRows: 
Boolean = false
+                                       ) extends CheckpointedDrm[K] {
+
+  private val _cacheStorageLevel: StorageLevel = 
SparkEngine.cacheHint2Spark(cacheHint)
 
   lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
   lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
@@ -110,7 +113,8 @@ class CheckpointedDrmSpark[K: ClassTag](
    *
    * Note that this pre-allocates target matrix and then assigns collected RDD 
to it
    * thus this likely would require about 2 times the RDD memory
-   * @return
+    *
+    * @return
    */
   def collect: Matrix = {
 
@@ -152,7 +156,8 @@ class CheckpointedDrmSpark[K: ClassTag](
 
   /**
    * Dump matrix as computed Mahout's DRM into specified (HD)FS path
-   * @param path
+    *
+    * @param path
    */
   def dfsWrite(path: String) = {
     val ktag = implicitly[ClassTag[K]]
@@ -207,13 +212,14 @@ class CheckpointedDrmSpark[K: ClassTag](
 
   /** Changes the number of rows in the DRM without actually touching the 
underlying data. Used to
     * redimension a DRM after it has been created, which implies some blank, 
non-existent rows.
+    *
     * @param n new row dimension
     * @return
     */
   override def newRowCardinality(n: Int): CheckpointedDrm[K] = {
     assert(n > -1)
     assert( n >= nrow)
-    new CheckpointedDrmSpark(rddInput = rddInput, _nrow = n, _ncol = _ncol, 
_cacheStorageLevel = _cacheStorageLevel,
+    new CheckpointedDrmSpark(rddInput = rddInput, _nrow = n, _ncol = _ncol, 
cacheHint = cacheHint,
       partitioningTag = partitioningTag, _canHaveMissingRows = 
_canHaveMissingRows)
   }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/a4bba826/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index de309c3..ff2df63 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -140,8 +140,8 @@ package object sparkbindings {
   def drmWrap[K: ClassTag](rdd: DrmRdd[K], nrow: Long = -1, ncol: Int = -1, 
cacheHint: CacheHint.CacheHint =
   CacheHint.NONE, canHaveMissingRows: Boolean = false): CheckpointedDrm[K] =
 
-    new CheckpointedDrmSpark[K](rddInput = rdd, _nrow = nrow, _ncol = ncol, 
_cacheStorageLevel = SparkEngine
-      .cacheHint2Spark(cacheHint), _canHaveMissingRows = canHaveMissingRows)
+    new CheckpointedDrmSpark[K](rddInput = rdd, _nrow = nrow, _ncol = ncol, 
cacheHint = cacheHint,
+      _canHaveMissingRows = canHaveMissingRows)
 
 
   /** Another drmWrap version that takes in vertical block-partitioned input 
to form the matrix. */

Reply via email to