Repository: mahout Updated Branches: refs/heads/flink-binding f5a4a9762 -> 9f6f62acf
Unifying "keyClassTag" of checkpoitns and "classTagK" of logical operators and elevating "keyClassTag" into DrmLike[] trait. No more logical forks any more . Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0df6e08f Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0df6e08f Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0df6e08f Branch: refs/heads/flink-binding Commit: 0df6e08f27c76c6eeb4a714e0087722bef166970 Parents: c72698a Author: Dmitriy Lyubimov <[email protected]> Authored: Mon Oct 19 23:26:43 2015 -0700 Committer: Dmitriy Lyubimov <[email protected]> Committed: Mon Oct 19 23:26:43 2015 -0700 ---------------------------------------------------------------------- .../mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala | 5 +++-- .../main/scala/org/apache/mahout/flinkbindings/package.scala | 7 +------ .../main/scala/org/apache/mahout/h2obindings/H2OEngine.scala | 2 +- .../apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala | 5 ++--- .../scala/org/apache/mahout/math/drm/CheckpointedDrm.scala | 8 -------- .../src/main/scala/org/apache/mahout/math/drm/DrmLike.scala | 2 ++ .../apache/mahout/math/drm/logical/AbstractBinaryOp.scala | 2 -- .../org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala | 2 -- .../apache/mahout/math/drm/logical/CheckpointAction.scala | 6 ++++-- .../mahout/sparkbindings/drm/CheckpointedDrmSpark.scala | 6 +++--- 10 files changed, 16 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index ee392b0..b6e6211 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -20,7 +20,7 @@ package org.apache.mahout.flinkbindings.drm import scala.collection.JavaConverters._ import scala.util.Random -import scala.reflect.ClassTag +import scala.reflect.{ClassTag, classTag} import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat @@ -75,7 +75,8 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], list.head } - def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]] + + override val keyClassTag: ClassTag[K] = classTag[K] def cache() = { // TODO http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index 57d2f48..c77a551 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -108,11 +108,6 @@ package object flinkbindings { new CheckpointedFlinkDrm[K](dataset) } - private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm match { - case d: CheckpointAction[K] => d.classTag - case d: CheckpointedFlinkDrm[K] => d.keyClassTag - // will not always return correct result, often result in Any - case _ => implicitly[ClassTag[K]] - } + private[flinkbindings] def extractRealClassTag[K: ClassTag](drm: DrmLike[K]): ClassTag[_] = drm.keyClassTag } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala index bcf3507..463e9f5 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala @@ -112,7 +112,7 @@ object H2OEngine extends DistributedEngine { case op@OpRowRange(a, r) => RowRange.exec(tr2phys(a)(op.classTagA), r) // Custom operators case blockOp: OpMapBlock[K, _] => MapBlock.exec(tr2phys(blockOp.A)(blockOp.classTagA), blockOp.ncol, blockOp.bmf, - (blockOp.classTagK == implicitly[ClassTag[String]]), blockOp.classTagA, blockOp.classTagK) + (blockOp.keyClassTag == implicitly[ClassTag[String]]), blockOp.classTagA, blockOp.keyClassTag) case op@OpPar(a, m, e) => Par.exec(tr2phys(a)(op.classTagA), m, e) case cp: CheckpointedDrm[K] => cp.h2odrm case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s." http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala index 371e8b4..f15e2bb 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala @@ -18,6 +18,8 @@ class CheckpointedDrmH2O[K: ClassTag]( val context: DistributedContext ) extends CheckpointedDrm[K] { + override val keyClassTag: ClassTag[K] = classTag[K] + /** * Collecting DRM to in-core Matrix * @@ -27,9 +29,6 @@ class CheckpointedDrmH2O[K: ClassTag]( */ def collect: Matrix = H2OHelper.matrixFromDrm(h2odrm) - /** Explicit extraction of key class Tag */ - def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]] - /* XXX: call frame.remove */ def uncache(): this.type = this http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 7f97481..43a400d 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -18,7 +18,6 @@ package org.apache.mahout.math.drm import org.apache.mahout.math.Matrix -import scala.reflect.ClassTag /** * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be @@ -34,13 +33,6 @@ trait CheckpointedDrm[K] extends DrmLike[K] { /** If this checkpoint is already declared cached, uncache. */ def uncache(): this.type - /** - * Explicit extraction of key class Tag since traits don't support context bound access; but actual - * implementation knows it - */ - def keyClassTag: ClassTag[K] - - /** changes the number of rows without touching the underlying data */ def newRowCardinality(n: Int): CheckpointedDrm[K] http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala index c5ba025..d6d9d38 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala @@ -25,6 +25,8 @@ import scala.reflect.ClassTag */ trait DrmLike[K] { + val keyClassTag: ClassTag[K] + protected[mahout] def partitioningTag: Long protected[mahout] def canHaveMissingRows: Boolean http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala index 3b6b8bf..b2ad6fb 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala @@ -49,6 +49,4 @@ abstract class AbstractBinaryOp[A: ClassTag, B: ClassTag, K: ClassTag] def classTagB: ClassTag[B] = implicitly[ClassTag[B]] - def classTagK: ClassTag[K] = implicitly[ClassTag[K]] - } http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala index 60b2c77..9e6ab77 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala @@ -30,8 +30,6 @@ abstract class AbstractUnaryOp[A: ClassTag, K: ClassTag] def classTagA: ClassTag[A] = implicitly[ClassTag[A]] - def classTagK: ClassTag[K] = implicitly[ClassTag[K]] - override protected[mahout] lazy val canHaveMissingRows: Boolean = A.canHaveMissingRows } http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala index 2324ca2..87235f4 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala @@ -17,13 +17,16 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag +import scala.reflect.{ClassTag, classTag} import scala.util.Random import org.apache.mahout.math.drm._ /** Implementation of distributed expression checkpoint and optimizer. */ abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] { + + override val keyClassTag: ClassTag[K] = classTag[K] + protected[mahout] lazy val partitioningTag: Long = Random.nextLong() private[mahout] var cp:Option[CheckpointedDrm[K]] = None @@ -44,6 +47,5 @@ abstract class CheckpointAction[K: ClassTag] extends DrmLike[K] { case Some(cp) => cp } - val classTag = implicitly[ClassTag[K]] } http://git-wip-us.apache.org/repos/asf/mahout/blob/0df6e08f/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 2f5d600..797a5c2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -51,6 +51,9 @@ class CheckpointedDrmSpark[K: ClassTag]( private var _canHaveMissingRows: Boolean = false ) extends CheckpointedDrm[K] { + + override val keyClassTag: ClassTag[K] = classTag[K] + lazy val nrow = if (_nrow >= 0) _nrow else computeNRow lazy val ncol = if (_ncol >= 0) _ncol else computeNCol lazy val canHaveMissingRows: Boolean = { @@ -64,9 +67,6 @@ class CheckpointedDrmSpark[K: ClassTag]( private var cached: Boolean = false override val context: DistributedContext = rddInput.backingRdd.context - /** Explicit extraction of key class Tag */ - def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]] - /** * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer * and writing down Spark graph lineage since last checkpointed DRM.
