Repository: mahout
Updated Branches:
  refs/heads/master 6919fd9fe -> fcd6b9e01


http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 99412df..42ddceb 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -24,7 +24,7 @@ import 
org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
 import scalabindings._
 import RLikeOps._
 import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSparkOps, 
cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, 
CheckpointedDrmSpark, DrmRddInput}
 import org.apache.mahout.math._
 import scala.Predef
 import scala.reflect.ClassTag
@@ -46,7 +46,7 @@ object SparkEngine extends DistributedEngine {
   // By default, use Hadoop 1 utils
   var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
 
-  def colSums[K](drm: CheckpointedDrm[K]): Vector = {
+  def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
     val n = drm.ncol
 
     drm.rdd
@@ -56,7 +56,7 @@ object SparkEngine extends DistributedEngine {
 
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter ⇒ {
-      val acc = ((new DenseVector(n): Vector) /: iter) ((acc, v) ⇒ acc += v)
+      val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) ⇒  acc += v)
       Iterator(acc)
     })
 
@@ -65,7 +65,7 @@ object SparkEngine extends DistributedEngine {
       .reduce(_ += _)
   }
 
-  def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = {
+  def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): 
Vector = {
     val n = drm.ncol
 
     drm.rdd
@@ -76,7 +76,7 @@ object SparkEngine extends DistributedEngine {
       // Fold() doesn't work with kryo still. So work around it.
       .mapPartitions(iter ⇒ {
       val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) ⇒
-        v.nonZeroes().foreach { elem ⇒ acc(elem.index) += 1 }
+        v.nonZeroes().foreach { elem ⇒  acc(elem.index) += 1}
         acc
       }
       Iterator(acc)
@@ -87,10 +87,10 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Engine-specific colMeans implementation based on a checkpoint. */
-  override def colMeans[K](drm: CheckpointedDrm[K]): Vector =
+  override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector =
     if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
 
-  override def norm[K](drm: CheckpointedDrm[K]): Double =
+  override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
     drm.rdd
       // Compute sum of squares of each vector
       .map {
@@ -100,7 +100,7 @@ object SparkEngine extends DistributedEngine {
 
 
   /** Optional engine-specific all reduce tensor operation. */
-  override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: 
BlockMapFunc2[K], rf:
+  override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: 
BlockMapFunc2[K], rf:
   BlockReduceFunc): Matrix = {
 
     import drm._
@@ -108,11 +108,11 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-    * Perform default expression rewrite. Return physical plan that we can 
pass to exec(). <P>
-    *
-    * A particular physical engine implementation may choose to either use or 
not use these rewrites
-    * as a useful basic rewriting rule.<P>
-    */
+   * Perform default expression rewrite. Return physical plan that we can pass 
to exec(). <P>
+   *
+   * A particular physical engine implementation may choose to either use or 
not use these rewrites
+   * as a useful basic rewriting rule.<P>
+   */
   override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = 
super.optimizerRewrite(action)
 
 
@@ -139,13 +139,14 @@ object SparkEngine extends DistributedEngine {
   def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] 
= dc.broadcast(m)
 
   /**
-    * Load DRM from hdfs (as in Mahout DRM format)
-    *
-    * @param path
-    * @param sc spark context (wanted to make that implicit, doesn't work in 
current version of
-    *           scala with the type bounds, sorry)
-    * @return DRM[Any] where Any is automatically translated to value type
-    */
+   * Load DRM from hdfs (as in Mahout DRM format)
+   *
+   * @param path
+   * @param sc spark context (wanted to make that implicit, doesn't work in 
current version of
+   *           scala with the type bounds, sorry)
+   *
+   * @return DRM[Any] where Any is automatically translated to value type
+   */
   def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: 
DistributedContext): CheckpointedDrm[_] = {
 
     // Require that context is actually Spark context.
@@ -162,7 +163,7 @@ object SparkEngine extends DistributedEngine {
     val rdd = sc.sequenceFile(path, classOf[Writable], 
classOf[VectorWritable], minPartitions = parMin)
 
       // Immediately convert keys and value writables into value types.
-      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get() }
+      .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get()}
 
     // Wrap into a DRM type with correct matrix row key class tag evident.
     drmWrap(rdd = rdd, cacheHint = 
CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
@@ -220,12 +221,11 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-    * Convert non-int-keyed matrix to an int-keyed, computing optionally 
mapping from old keys
-    * to row indices in the new one. The mapping, if requested, is returned as 
a 1-column matrix.
-    */
-  override def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): 
(DrmLike[Int], Option[DrmLike[K]]) = {
-    implicit val ktag = drmX.keyClassTag
-    if (ktag == ClassTag.Int) {
+   * Convert non-int-keyed matrix to an int-keyed, computing optionally 
mapping from old keys
+   * to row indices in the new one. The mapping, if requested, is returned as 
a 1-column matrix.
+   */
+  override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean 
= false): (DrmLike[Int], Option[DrmLike[K]]) = {
+    if (classTag[K] == ClassTag.Int) {
       drmX.asInstanceOf[DrmLike[Int]] → None
     } else {
 
@@ -237,29 +237,26 @@ object SparkEngine extends DistributedEngine {
       val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = 
computeMap)
 
       // Convert computed key mapping to a matrix.
-      val mxKeyMap = keyMap.map { rdd ⇒
-        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → 
(dvec(ordinal): Vector) }, ncol = 1, nrow = nrow)
+      val mxKeyMap = keyMap.map { rdd =>
+        drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → 
(dvec(ordinal):Vector)}, ncol = 1, nrow = nrow)
       }
 
 
       drmWrap(rdd = intRdd, ncol = ncol) → mxKeyMap
-    }
+  }
 
   }
 
 
   /**
-    * (Optional) Sampling operation. Consistent with Spark semantics of the 
same.
-    *
-    * @param drmX
-    * @param fraction
-    * @param replacement
-    * @tparam K
-    * @return
-    */
-  override def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, 
replacement: Boolean): DrmLike[K] = {
-
-    implicit val ktag = drmX.keyClassTag
+   * (Optional) Sampling operation. Consistent with Spark semantics of the 
same.
+   * @param drmX
+   * @param fraction
+   * @param replacement
+   * @tparam K
+   * @return
+   */
+  override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, 
replacement: Boolean): DrmLike[K] = {
 
     // We do want to take ncol if already computed, if not, then we don't want 
to trigger computation
     // here.
@@ -268,14 +265,14 @@ object SparkEngine extends DistributedEngine {
       case _ ⇒ -1
     }
     val sample = drmX.rdd.sample(withReplacement = replacement, fraction = 
fraction)
-    if (ktag != ClassTag.Int) return drmWrap(sample, ncol = ncol)
+    if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol)
 
     // K == Int: Int-keyed sample. rebase int counts.
     drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol 
= ncol).asInstanceOf[DrmLike[K]]
   }
 
 
-  override def drmSampleKRows[K](drmX: DrmLike[K], numSamples: Int, 
replacement: Boolean): Matrix = {
+  override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, 
replacement: Boolean): Matrix = {
 
     val ncol = drmX match {
       case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
@@ -289,9 +286,9 @@ object SparkEngine extends DistributedEngine {
     val isSparse = sample.exists { case (_, vec) ⇒ !vec.isDense }
 
     val vectors = sample.map(_._2)
-    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ 
key.toString → (idx: Integer) }.toMap
+    val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ 
key.toString → (idx:Integer) }.toMap
 
-    val mx: Matrix = if (isSparse) sparse(vectors: _*) else dense(vectors)
+    val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors)
     mx.setRowLabelBindings(labels)
 
     mx
@@ -304,7 +301,7 @@ object SparkEngine extends DistributedEngine {
     case CacheHint.MEMORY_ONLY ⇒ StorageLevel.MEMORY_ONLY
     case CacheHint.MEMORY_ONLY_2 ⇒ StorageLevel.MEMORY_ONLY_2
     case CacheHint.MEMORY_ONLY_SER ⇒ StorageLevel.MEMORY_ONLY_SER
-    case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
+      case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
     case CacheHint.MEMORY_AND_DISK ⇒ StorageLevel.MEMORY_AND_DISK
     case CacheHint.MEMORY_AND_DISK_2 ⇒ StorageLevel.MEMORY_AND_DISK_2
     case CacheHint.MEMORY_AND_DISK_SER ⇒ StorageLevel.MEMORY_AND_DISK_SER
@@ -312,7 +309,7 @@ object SparkEngine extends DistributedEngine {
   }
 
   /** Translate previously optimized physical plan */
-  private def tr2phys[K](oper: DrmLike[K]): DrmRddInput[K] = {
+  private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
     // I do explicit evidence propagation here since matching via case classes 
seems to be loosing
     // it and subsequently may cause something like DrmRddInput[Any] instead 
of [Int] or [String].
     // Hence you see explicit evidence attached to all recursive exec() calls.
@@ -322,32 +319,28 @@ object SparkEngine extends DistributedEngine {
       // (we cannot do actual flip for non-int-keyed arguments)
       case OpAtAnyKey(_) ⇒
         throw new IllegalArgumentException("\"A\" must be Int-keyed in this 
A.t expression.")
-      case op@OpAt(a) if op.keyClassTag == ClassTag.Int ⇒ At.at(op, 
tr2phys(a)).asInstanceOf[DrmRddInput[K]]
-      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a), tr2phys(b))
-      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a), 
tr2phys(b)).asInstanceOf[DrmRddInput[K]]
-      case op@OpAtA(a) if op.keyClassTag == ClassTag.Int ⇒ AtA.at_a(op, 
tr2phys(a)).asInstanceOf[DrmRddInput[K]]
-      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a))
-      case op@OpAtx(a, x) if op.keyClassTag == ClassTag.Int ⇒
-        Ax.atx_with_broadcast(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
-      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a))
-      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a))
-      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a), tr2phys(b))
-      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a), 
tr2phys(b))
-      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a))
-      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a), tr2phys(b))
-      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a), s)
-      case op@OpRowRange(a, _) if op.keyClassTag == ClassTag.Int ⇒
-        Slicing.rowRange(op, tr2phys(a)).asInstanceOf[DrmRddInput[K]]
-      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, 
tr2phys(a))
+      case op@OpAt(a) ⇒ At.at(op, tr2phys(a)(op.classTagA))
+      case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a)(op.classTagA), 
tr2phys(b)(op.classTagB))
+      case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a)(op.classTagA), 
tr2phys(b)(op.classTagB))
+      case op@OpAtA(a) ⇒ AtA.at_a(op, tr2phys(a)(op.classTagA))
+      case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+      case op@OpAtx(a, x) ⇒ Ax.atx_with_broadcast(op, 
tr2phys(a)(op.classTagA))
+      case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, 
tr2phys(a)(op.classTagA))
+      case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, 
tr2phys(a)(op.classTagA))
+      case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a)(op.classTagA), 
tr2phys(b)(op.classTagB))
+      case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, 
tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+      case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, 
tr2phys(a)(op.classTagA))
+      case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), 
tr2phys(b)(op.classTagB))
+      case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, 
tr2phys(a)(op.classTagA), s)
+      case op@OpRowRange(a, _) ⇒ Slicing.rowRange(op, 
tr2phys(a)(op.classTagA))
+      case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, 
tr2phys(a)(op.classTagA))
       // Custom operators, we just execute them
-      case blockOp: OpMapBlock[_, K] ⇒ MapBlock.exec(
-        src = tr2phys(blockOp.A),
+      case blockOp: OpMapBlock[K, _] ⇒ MapBlock.exec(
+        src = tr2phys(blockOp.A)(blockOp.classTagA),
         operator = blockOp
       )
-      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a))
-      case cp: CheckpointedDrm[K] ⇒
-        implicit val ktag=cp.keyClassTag
-        cp.rdd: DrmRddInput[K]
+      case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a)(op.classTagA))
+      case cp: CheckpointedDrm[K] ⇒ cp.rdd: DrmRddInput[K]
       case _ ⇒ throw new IllegalArgumentException("Internal:Optimizer has no 
exec policy for operator %s."
         .format(oper))
 
@@ -355,34 +348,32 @@ object SparkEngine extends DistributedEngine {
   }
 
   /**
-    * Returns an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from 
default text
-    * delimited files. Reads a vector per row.
-    *
-    * @param src    a comma separated list of URIs to read from
-    * @param schema how the text file is formatted
-    */
+   * Returns an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from 
default text
+   * delimited files. Reads a vector per row.
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   */
   def indexedDatasetDFSRead(src: String,
-                            schema: Schema = DefaultIndexedDatasetReadSchema,
-                            existingRowIDs: Option[BiDictionary] = None)
-                           (implicit sc: DistributedContext):
-  IndexedDatasetSpark = {
+      schema: Schema = DefaultIndexedDatasetReadSchema,
+      existingRowIDs: Option[BiDictionary] = None)
+      (implicit sc: DistributedContext):
+    IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
     val ids = reader.readRowsFrom(src, existingRowIDs)
     ids
   }
 
   /**
-    * Returns an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from 
default text
-    * delimited files. Reads an element per row.
-    *
-    * @param src    a comma separated list of URIs to read from
-    * @param schema how the text file is formatted
-    */
+   * Returns an 
[[org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark]] from 
default text
+   * delimited files. Reads an element per row.
+   * @param src a comma separated list of URIs to read from
+   * @param schema how the text file is formatted
+   */
   def indexedDatasetDFSReadElements(src: String,
-                                    schema: Schema = 
DefaultIndexedDatasetElementReadSchema,
-                                    existingRowIDs: Option[BiDictionary] = 
None)
-                                   (implicit sc: DistributedContext):
-  IndexedDatasetSpark = {
+      schema: Schema = DefaultIndexedDatasetElementReadSchema,
+      existingRowIDs: Option[BiDictionary] = None)
+      (implicit sc: DistributedContext):
+    IndexedDatasetSpark = {
     val reader = new TextDelimitedIndexedDatasetReader(schema)(sc)
     val ids = reader.readElementsFrom(src, existingRowIDs)
     ids

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 5142d3b..11e2bad 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -44,13 +44,13 @@ object ABt {
    * @param srcB B source RDD 
    * @tparam K
    */
-  def abt[K](
+  def abt[K: ClassTag](
       operator: OpABt[K],
       srcA: DrmRddInput[K],
       srcB: DrmRddInput[Int]): DrmRddInput[K] = {
 
     debug("operator AB'(Spark)")
-    abt_nograph(operator, srcA, srcB)(operator.keyClassTag)
+    abt_nograph(operator, srcA, srcB)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index d8637d2..8a90398 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,18 +17,20 @@
 
 package org.apache.mahout.sparkbindings.blas
 
-import org.apache.mahout.logging._
-import org.apache.mahout.math._
-import org.apache.mahout.math.drm._
-import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewB, 
OpAewScalar, TEwFunc}
-import org.apache.mahout.math.scalabindings.RLikeOps._
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFunc, ReduceFuncScalar}
 import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import scala.reflect.ClassTag
+import org.apache.spark.SparkContext._
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector}
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, 
OpAewScalar, OpAewB}
+import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
 import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
-
-import scala.reflect.{ClassTag, classTag}
-import scala.collection.JavaConversions._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.logging._
+import collection._
+import JavaConversions._
 
 /** Elementwise drm-drm operators */
 object AewB {
@@ -51,9 +53,7 @@ object AewB {
 
 
   /** Elementwise matrix-matrix operator, now handles both non- and 
identically partitioned */
-  def a_ew_b[K](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): 
DrmRddInput[K] = {
-
-    implicit val ktag = op.keyClassTag
+  def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: 
DrmRddInput[K]): DrmRddInput[K] = {
 
     val ewOps = getEWOps()
     val opId = op.op
@@ -111,16 +111,15 @@ object AewB {
     rdd
   }
 
-  def a_ew_func[K](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: 
DrmRddInput[K]):DrmRddInput[K] = {
+  def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: 
DrmRddInput[K]):DrmRddInput[K] = {
 
     val evalZeros = op.evalZeros
     val inplace = ewInplace()
     val f = op.f
-    implicit val ktag = op.keyClassTag
 
     // Before obtaining blockified rdd, see if we have to fix int row key 
consistency so that missing
     // rows can get lazily pre-populated with empty vectors before proceeding 
with elementwise scalar.
-    val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows 
&& evalZeros) {
+    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && 
op.A.canHaveMissingRows && evalZeros) {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = 
srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = 
op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {
@@ -150,13 +149,12 @@ object AewB {
   }
 
   /** Physical algorithm to handle matrix-scalar operators like A - s or s -: 
A */
-  def a_ew_scalar[K](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
+  def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], 
scalar: Double):
   DrmRddInput[K] = {
 
 
     val ewOps = getEWOps()
     val opId = op.op
-    implicit val ktag = op.keyClassTag
 
     val reduceFunc = opId match {
       case "+" => ewOps.plusScalar
@@ -170,7 +168,7 @@ object AewB {
 
     // Before obtaining blockified rdd, see if we have to fix int row key 
consistency so that missing 
     // rows can get lazily pre-populated with empty vectors before proceeding 
with elementwise scalar.
-    val aBlockRdd = if (classTag[K] == ClassTag.Int && 
op.A.canHaveMissingRows) {
+    val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && 
op.A.canHaveMissingRows) {
       val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = 
srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
       drm.blockify(fixedRdd, blockncol = 
op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
     } else {

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 6fe076e..5f9f84a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -17,10 +17,7 @@ object AinCoreB {
 
   private final implicit val log = getLog(AinCoreB.getClass)
 
-  def rightMultiply[K](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): 
DrmRddInput[K] = {
-
-    implicit val ktag = op.keyClassTag
-
+  def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: 
DrmRddInput[K]): DrmRddInput[K] = {
     if ( op.right.isInstanceOf[DiagonalMatrix])
       rightMultiply_diag(op, srcA)
     else

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 42e56e7..629accd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -13,11 +13,10 @@ import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
 /** Matrix product with one of operands an in-core matrix */
 object Ax {
 
-  def ax_with_broadcast[K](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] 
= {
+  def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): 
DrmRddInput[K] = {
 
     val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
     implicit val sc: DistributedContext = rddA.sparkContext
-    implicit val ktag = op.keyClassTag
 
     val bcastX = drmBroadcast(op.x)
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index e900749..4a379ec 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -32,9 +32,7 @@ object CbindAB {
 
   private val log = Logger.getLogger(CbindAB.getClass)
 
-  def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : 
DrmRddInput[K] = {
-
-    implicit val ktag = op.keyClassTag
+  def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : 
DrmRddInput[K] = {
     val srcRdd = srcA.toDrmRdd()
 
     val ncol = op.A.ncol
@@ -62,14 +60,13 @@ object CbindAB {
     resultRdd
   }
 
-  def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: 
DrmRddInput[K]): DrmRddInput[K] = {
+  def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: 
DrmRddInput[K]): DrmRddInput[K] = {
 
     val a = srcA.toDrmRdd()
     val b = srcB.toDrmRdd()
     val n = op.ncol
     val n1 = op.A.ncol
     val n2 = n - n1
-    implicit val ktag = op.keyClassTag
 
     // Check if A and B are identically partitioned AND keyed. if they are, 
then just perform zip
     // instead of join, and apply the op map-side. Otherwise, perform join and 
apply the op

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 6104d83..4cd9a74 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -23,7 +23,7 @@ import RLikeOps._
 import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
 import org.apache.mahout.sparkbindings.DrmRdd
 
-class DrmRddOps[K](private[blas] val rdd: DrmRdd[K]) {
+class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
 
   /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */
   def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 7e48ed8..2933ddc 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -25,14 +25,13 @@ import scala.reflect.ClassTag
 
 object MapBlock {
 
-  def exec[S, R](src: DrmRddInput[S], operator:OpMapBlock[S,R]): 
DrmRddInput[R] = {
+  def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): 
DrmRddInput[R] = {
 
     // We can't use attributes directly in the closure in order to avoid 
putting the whole object
     // into closure.
     val bmf = operator.bmf
     val ncol = operator.ncol
-    implicit val rtag = operator.keyClassTag
-    src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
+    val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
       val out = bmf(blockTuple)
 
       assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return 
same number of rows.")
@@ -40,6 +39,8 @@ object MapBlock {
 
       out
     })
+
+    rdd
   }
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index 7e32b69..0434a72 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -15,9 +15,8 @@ object Par {
 
   private final implicit val log = getLog(Par.getClass)
 
-  def exec[K](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
+  def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
 
-    implicit val ktag = op.keyClassTag
     val srcBlockified = src.isBlockified
 
     val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else 
src.toDrmRdd()

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 14772f6..62abba6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -27,9 +27,7 @@ object RbindAB {
 
   private val log = Logger.getLogger(RbindAB.getClass)
 
-  def rbindAB[K](op: OpRbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): 
DrmRddInput[K] = {
-
-    implicit val ktag = op.keyClassTag
+  def rbindAB[K: ClassTag](op: OpRbind[K], srcA: DrmRddInput[K], srcB: 
DrmRddInput[K]): DrmRddInput[K] = {
 
     // If any of the inputs is blockified, use blockified inputs
     if (srcA.isBlockified || srcB.isBlockified) {

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 5a83f80..6b8513f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -35,7 +35,7 @@ import JavaConversions._
  */
 package object blas {
 
-  implicit def drmRdd2ops[K](rdd: DrmRdd[K]): DrmRddOps[K] = new 
DrmRddOps[K](rdd)
+  implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new 
DrmRddOps[K](rdd)
 
 
   /**
@@ -46,7 +46,7 @@ package object blas {
    * @tparam K existing key parameter
    * @return
    */
-  private[mahout] def rekeySeqInts[K](rdd: DrmRdd[K], computeMap: Boolean = 
true): (DrmRdd[Int],
+  private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: 
Boolean = true): (DrmRdd[Int],
     Option[RDD[(K, Int)]]) = {
 
     // Spark context please.

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
index 3c086fe..abcfc64 100644
--- 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
+++ 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -1,17 +1,16 @@
 package org.apache.mahout.sparkbindings.drm
 
 import org.apache.mahout.math.drm.CheckpointedDrm
-import org.apache.mahout.sparkbindings.DrmRdd
 import scala.reflect.ClassTag
 
 /** Additional Spark-specific operations. Requires underlying DRM to be 
running on Spark backend. */
-class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) {
+class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
 
   assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed 
matrix")
 
   private[sparkbindings] val sparkDrm = 
drm.asInstanceOf[CheckpointedDrmSpark[K]]
 
   /** Spark matrix customization exposure */
-  def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd()
+  def rdd = sparkDrm.rddInput.toDrmRdd()
 
 }

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
----------------------------------------------------------------------
diff --git 
a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
index b793098..e18d6da 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala
@@ -37,15 +37,15 @@ package object drm {
 
   private[drm] final val log = 
Logger.getLogger("org.apache.mahout.sparkbindings");
 
-  private[sparkbindings] implicit def cpDrm2DrmRddInput[K](cp: 
CheckpointedDrmSpark[K]): DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrm2DrmRddInput[K: ClassTag](cp: 
CheckpointedDrmSpark[K]): DrmRddInput[K] =
     cp.rddInput
 
-  private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K](cp: 
CheckpointedDrm[K]): DrmRddInput[K] =
+  private[sparkbindings] implicit def cpDrmGeneric2DrmRddInput[K: 
ClassTag](cp: CheckpointedDrm[K]): DrmRddInput[K] =
     cp.asInstanceOf[CheckpointedDrmSpark[K]]
 
-  private[sparkbindings] implicit def drmRdd2drmRddInput[K:ClassTag](rdd: 
DrmRdd[K]) = new DrmRddInput[K](Left(rdd))
+  private[sparkbindings] implicit def drmRdd2drmRddInput[K: ClassTag](rdd: 
DrmRdd[K]) = new DrmRddInput[K](Left(rdd))
 
-  private[sparkbindings] implicit def 
blockifiedRdd2drmRddInput[K:ClassTag](rdd: BlockifiedDrmRdd[K]) = new
+  private[sparkbindings] implicit def blockifiedRdd2drmRddInput[K: 
ClassTag](rdd: BlockifiedDrmRdd[K]) = new
       DrmRddInput[K](
     Right(rdd))
 

http://git-wip-us.apache.org/repos/asf/mahout/blob/fcd6b9e0/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala 
b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
index de309c3..330ae38 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala
@@ -108,10 +108,10 @@ package object sparkbindings {
   implicit def sb2bc[T](b: Broadcast[T]): BCast[T] = new SparkBCast(b)
 
   /** Adding Spark-specific ops */
-  implicit def cpDrm2cpDrmSparkOps[K](drm: CheckpointedDrm[K]): 
CheckpointedDrmSparkOps[K] =
+  implicit def cpDrm2cpDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]): 
CheckpointedDrmSparkOps[K] =
     new CheckpointedDrmSparkOps[K](drm)
 
-  implicit def drm2cpDrmSparkOps[K](drm: DrmLike[K]): 
CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K]
+  implicit def drm2cpDrmSparkOps[K: ClassTag](drm: DrmLike[K]): 
CheckpointedDrmSparkOps[K] = drm: CheckpointedDrm[K]
 
   private[sparkbindings] implicit def m2w(m: Matrix): MatrixWritable = new 
MatrixWritable(m)
 

Reply via email to