Repository: mahout
Updated Branches:
  refs/heads/flink-binding f5a4a9762 -> 9f6f62acf


Unifying "keyClassTag" of checkpoitns and "classTagK" of logical operators and
elevating "keyClassTag" into DrmLike[] trait. No more logical forks any more .


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0df6e08f
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0df6e08f
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0df6e08f

Branch: refs/heads/flink-binding
Commit: 0df6e08f27c76c6eeb4a714e0087722bef166970
Parents: c72698a
Author: Dmitriy Lyubimov <[email protected]>
Authored: Mon Oct 19 23:26:43 2015 -0700
Committer: Dmitriy Lyubimov <[email protected]>
Committed: Mon Oct 19 23:26:43 2015 -0700

----------------------------------------------------------------------
 .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala      | 5 +++--
 .../main/scala/org/apache/mahout/flinkbindings/package.scala | 7 +------
 .../main/scala/org/apache/mahout/h2obindings/H2OEngine.scala | 2 +-
 .../apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala   | 5 ++---
 .../scala/org/apache/mahout/math/drm/CheckpointedDrm.scala   | 8 --------
 .../src/main/scala/org/apache/mahout/math/drm/DrmLike.scala  | 2 ++
 .../apache/mahout/math/drm/logical/AbstractBinaryOp.scala    | 2 --
 .../org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala | 2 --
 .../apache/mahout/math/drm/logical/CheckpointAction.scala    | 6 ++++--
 .../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala      | 6 +++---
 10 files changed, 16 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
index ee392b0..b6e6211 100644
--- 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
+++ 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala
@@ -20,7 +20,7 @@ package org.apache.mahout.flinkbindings.drm
 
 import scala.collection.JavaConverters._
 import scala.util.Random
-import scala.reflect.ClassTag
+import scala.reflect.{ClassTag, classTag}
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat
@@ -75,7 +75,8 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K],
     list.head
   }
 
-  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
+
+  override val keyClassTag: ClassTag[K] = classTag[K]
 
   def cache() = {
     // TODO

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
----------------------------------------------------------------------
diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
index 57d2f48..c77a551 100644
--- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala
@@ -108,11 +108,6 @@ package object flinkbindings {
     new CheckpointedFlinkDrm[K](dataset)
   }
 
-  private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: 
DrmLike[K]): ClassTag[_] = drm match {
-    case d: CheckpointAction[K] => d.classTag
-    case d: CheckpointedFlinkDrm[K] => d.keyClassTag
-    // will not always return correct result, often result in Any
-    case _ => implicitly[ClassTag[K]]
-  }
+  private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: 
DrmLike[K]): ClassTag[_] = drm.keyClassTag
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index bcf3507..463e9f5 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -112,7 +112,7 @@ object H2OEngine extends DistributedEngine {
       case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r)
       // Custom operators
       case blockOp: OpMapBlock[K, _] => 
MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf,
-        (blockOp.classTagK == implicitly[ClassTag[String]]), 
blockOp.classTagA, blockOp.classTagK)
+        (blockOp.keyClassTag == implicitly[ClassTag[String]]), 
blockOp.classTagA, blockOp.keyClassTag)
       case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e)
       case cp: CheckpointedDrm[K] => cp.h2odrm
       case _ => throw new IllegalArgumentException("Internal:Optimizer has no 
exec policy for operator %s."

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git 
a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index 371e8b4..f15e2bb 100644
--- 
a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ 
b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -18,6 +18,8 @@ class CheckpointedDrmH2O[K: ClassTag](
   val context: DistributedContext
 ) extends CheckpointedDrm[K] {
 
+  override val keyClassTag: ClassTag[K] = classTag[K]
+
   /**
     * Collecting DRM to in-core Matrix
     *
@@ -27,9 +29,6 @@ class CheckpointedDrmH2O[K: ClassTag](
     */
   def collect: Matrix = H2OHelper.matrixFromDrm(h2odrm)
 
-  /** Explicit extraction of key class Tag   */
-  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
-
   /* XXX: call frame.remove */
   def uncache(): this.type = this
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 7f97481..43a400d 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -18,7 +18,6 @@
 package org.apache.mahout.math.drm
 
 import org.apache.mahout.math.Matrix
-import scala.reflect.ClassTag
 
 /**
  * Checkpointed DRM API. This is a matrix that has optimized RDD lineage 
behind it and can be
@@ -34,13 +33,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
   /** If this checkpoint is already declared cached, uncache. */
   def uncache(): this.type
 
-  /**
-   * Explicit extraction of key class Tag since traits don't support context 
bound access; but actual
-   * implementation knows it
-   */
-  def keyClassTag: ClassTag[K]
-
-
   /** changes the number of rows without touching the underlying data */
   def newRowCardinality(n: Int): CheckpointedDrm[K]
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index c5ba025..d6d9d38 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -25,6 +25,8 @@ import scala.reflect.ClassTag
  */
 trait DrmLike[K] {
 
+  val keyClassTag: ClassTag[K]
+
   protected[mahout] def partitioningTag: Long
 
   protected[mahout] def canHaveMissingRows: Boolean

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
index 3b6b8bf..b2ad6fb 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala
@@ -49,6 +49,4 @@ abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: 
ClassTag]
 
   def classTagB: ClassTag[B] = implicitly[ClassTag[B]]
 
-  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
index 60b2c77..9e6ab77 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala
@@ -30,8 +30,6 @@ abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag]
 
   def classTagA: ClassTag[A] = implicitly[ClassTag[A]]
 
-  def classTagK: ClassTag[K] = implicitly[ClassTag[K]]
-
   override protected[mahout] lazy val canHaveMissingRows: Boolean = 
A.canHaveMissingRows
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
----------------------------------------------------------------------
diff --git 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
index 2324ca2..87235f4 100644
--- 
a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
+++ 
b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala
@@ -17,13 +17,16 @@
 
 package org.apache.mahout.math.drm.logical
 
-import scala.reflect.ClassTag
+import scala.reflect.{ClassTag, classTag}
 import scala.util.Random
 import org.apache.mahout.math.drm._
 
 /** Implementation of distributed expression checkpoint and optimizer. */
 abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] {
 
+
+  override val keyClassTag: ClassTag[K] = classTag[K]
+
   protected[mahout] lazy val partitioningTag: Long = Random.nextLong()
 
   private[mahout] var cp:Option[CheckpointedDrm[K]] = None
@@ -44,6 +47,5 @@ abstract class CheckpointAction[K: ClassTag] extends 
DrmLike[K] {
     case Some(cp) => cp
   }
 
-  val classTag = implicitly[ClassTag[K]]
 }
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index 2f5d600..797a5c2 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -51,6 +51,9 @@ class CheckpointedDrmSpark[K: ClassTag](
     private var _canHaveMissingRows: Boolean = false
     ) extends CheckpointedDrm[K] {
 
+
+  override val keyClassTag: ClassTag[K] = classTag[K]
+
   lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
   lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
   lazy val canHaveMissingRows: Boolean = {
@@ -64,9 +67,6 @@ class CheckpointedDrmSpark[K: ClassTag](
   private var cached: Boolean = false
   override val context: DistributedContext = rddInput.backingRdd.context
 
-  /** Explicit extraction of key class Tag   */
-  def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
-
   /**
    * Action operator -- does not necessary means Spark action; but does mean 
running BLAS optimizer
    * and writing down Spark graph lineage since last checkpointed DRM.

Reply via email to