Repository: mahout Updated Branches: refs/heads/flink-binding 73649fa84 -> a168d238d
NoJira: removed unnecessary references and imports of scala.ClassTag Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/a168d238 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/a168d238 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/a168d238 Branch: refs/heads/flink-binding Commit: a168d238d13fabfe11afe6870b38e09534dc65d6 Parents: 73649fa Author: smarthi <[email protected]> Authored: Wed Mar 9 19:55:04 2016 -0500 Committer: smarthi <[email protected]> Committed: Wed Mar 9 19:59:44 2016 -0500 ---------------------------------------------------------------------- .../apache/mahout/h2o/common/DrmMetadata.scala | 14 ++++----- .../mahout/h2obindings/ops/MapBlockHelper.scala | 2 +- .../mahout/math/drm/CheckpointedDrm.scala | 1 - .../mahout/math/drm/DrmDoubleScalarOps.scala | 8 ++--- .../math/drm/logical/AbstractBinaryOp.scala | 1 - .../math/drm/logical/AbstractUnaryOp.scala | 1 - .../math/drm/logical/CheckpointAction.scala | 4 --- .../math/drm/logical/OpAewUnaryFunc.scala | 3 -- .../math/drm/logical/OpAewUnaryFuncFusion.scala | 1 - .../mahout/math/drm/logical/OpCbind.scala | 1 - .../mahout/math/drm/logical/OpCbindScalar.scala | 1 - .../apache/mahout/math/drm/logical/OpPar.scala | 1 - .../mahout/math/drm/logical/OpRbind.scala | 1 - .../math/drm/logical/OpTimesRightMatrix.scala | 1 - .../org/apache/mahout/math/drm/package.scala | 1 - .../mahout/math/drm/DrmLikeSuiteBase.scala | 1 - .../classifier/naivebayes/SparkNaiveBayes.scala | 3 +- .../org/apache/mahout/common/DrmMetadata.scala | 14 ++++----- .../mahout/sparkbindings/SparkEngine.scala | 30 ++++++++---------- .../apache/mahout/sparkbindings/blas/ABt.scala | 26 ++++++++-------- .../apache/mahout/sparkbindings/blas/AewB.scala | 7 ++--- .../mahout/sparkbindings/blas/AinCoreB.scala | 4 +-- .../apache/mahout/sparkbindings/blas/AtB.scala | 25 +++++++-------- .../mahout/sparkbindings/blas/CbindAB.scala | 3 +- .../mahout/sparkbindings/blas/DrmRddOps.scala | 1 - .../mahout/sparkbindings/blas/MapBlock.scala | 4 +-- .../apache/mahout/sparkbindings/blas/Par.scala | 20 +++++------- .../mahout/sparkbindings/blas/RbindAB.scala | 1 - .../mahout/sparkbindings/blas/package.scala | 21 ++++++------- .../drm/CheckpointedDrmSpark.scala | 10 +++--- .../drm/CheckpointedDrmSparkOps.scala | 1 - .../mahout/sparkbindings/drm/package.scala | 26 ++++++---------- .../apache/mahout/sparkbindings/package.scala | 4 +-- .../sparkbindings/drm/DrmLikeOpsSuite.scala | 32 +++++++++----------- 34 files changed, 110 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/h2o/src/main/scala/org/apache/mahout/h2o/common/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2o/common/DrmMetadata.scala b/h2o/src/main/scala/org/apache/mahout/h2o/common/DrmMetadata.scala index 3eb0974..33aafef 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2o/common/DrmMetadata.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2o/common/DrmMetadata.scala @@ -25,13 +25,13 @@ class DrmMetadata( keyW2ValFunc: ((Writable) => Any) ) = keyTypeWritable match { - case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _ - case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _ - case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _ - case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _ - case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _ - case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _ - case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _ + case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _ + case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _ + case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _ + case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _ + case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _ + case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _ + case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _ case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") } http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/h2o/src/main/scala/org/apache/mahout/h2obindings/ops/MapBlockHelper.scala ---------------------------------------------------------------------- diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/ops/MapBlockHelper.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/ops/MapBlockHelper.scala index 2bc23e5..552fd9e 100644 --- a/h2o/src/main/scala/org/apache/mahout/h2obindings/ops/MapBlockHelper.scala +++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/ops/MapBlockHelper.scala @@ -37,7 +37,7 @@ object MapBlockHelper { case `s` => val arr = new Array[String](in.rowSize) val vstr = new ValueString - for (i <- 0 to (in.rowSize - 1)) { + for (i <- 0 until in.rowSize) { arr(i) = labels.atStr(vstr, i + startlong).toString } arr http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala index 9a08740..31f8097 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala @@ -19,7 +19,6 @@ package org.apache.mahout.math.drm import org.apache.mahout.math.Matrix import org.apache.mahout.math.drm.CacheHint.CacheHint -import scala.reflect.ClassTag /** * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala index 96ef893..de03776 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala @@ -17,12 +17,10 @@ package org.apache.mahout.math.drm -import RLikeDrmOps._ -import org.apache.mahout.math._ +import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.math.drm.logical.OpCbindScalar -import scalabindings._ -import RLikeOps._ -import reflect.ClassTag + +import scala.reflect.ClassTag class DrmDoubleScalarOps(val x:Double) extends AnyVal{ http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala index 9fba286..ba41657 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractBinaryOp.scala @@ -17,7 +17,6 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag import org.apache.mahout.math.drm.{DistributedContext, DrmLike} /** http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala index 28cf87d..6a70aec 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/AbstractUnaryOp.scala @@ -17,7 +17,6 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag import org.apache.mahout.math.drm.{DistributedContext, DrmLike} /** Abstract unary operator */ http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala index cd70631..aa1d8bc 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/CheckpointAction.scala @@ -17,16 +17,12 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.{ClassTag, classTag} import scala.util.Random import org.apache.mahout.math.drm._ /** Implementation of distributed expression checkpoint and optimizer. */ abstract class CheckpointAction[K] extends DrmLike[K] { - - override val keyClassTag: ClassTag[K] = classTag[K] - protected[mahout] lazy val partitioningTag: Long = Random.nextLong() private[mahout] var cp:Option[CheckpointedDrm[K]] = None http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala index 6f93980..0607686 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFunc.scala @@ -21,9 +21,6 @@ import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike import scala.util.Random -/** - * @author dmitriy - */ case class OpAewUnaryFunc[K]( override var A: DrmLike[K], val f: (Double) => Double, http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala index 5b0133f..19bdc64 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpAewUnaryFuncFusion.scala @@ -20,7 +20,6 @@ package org.apache.mahout.math.drm.logical import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike import scala.util.Random -import collection._ /** * Composition of unary elementwise functions. http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala index 932f133..cbc20ae 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbind.scala @@ -17,7 +17,6 @@ package org.apache.mahout.math.drm.logical -import reflect.ClassTag import org.apache.mahout.math.drm.DrmLike import scala.util.Random http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala index 99c2bfa..c3775ed 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpCbindScalar.scala @@ -16,7 +16,6 @@ */ package org.apache.mahout.math.drm.logical -import reflect.ClassTag import org.apache.mahout.math.drm.DrmLike case class OpCbindScalar[K]( http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala index 0fadce3..2402b1f 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpPar.scala @@ -1,7 +1,6 @@ package org.apache.mahout.math.drm.logical import org.apache.mahout.math.drm.DrmLike -import scala.reflect.ClassTag /** Parallelism operator */ case class OpPar[K]( http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala index f8c1059..1c67868 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpRbind.scala @@ -17,7 +17,6 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag import org.apache.mahout.math.drm.DrmLike import scala.util.Random http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala index 1b12035..94104bb 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/logical/OpTimesRightMatrix.scala @@ -17,7 +17,6 @@ package org.apache.mahout.math.drm.logical -import scala.reflect.ClassTag import org.apache.mahout.math.Matrix import org.apache.mahout.math.scalabindings._ import RLikeOps._ http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala index 902113a..ecb557b 100644 --- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala +++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala @@ -18,7 +18,6 @@ package org.apache.mahout.math import org.apache.mahout.math.drm._ -import org.apache.mahout.math.indexeddataset.{IndexedDataset, DefaultIndexedDatasetReadSchema, Schema} import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala ---------------------------------------------------------------------- diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala index f215fb7..41814d8 100644 --- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala +++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala @@ -22,7 +22,6 @@ import org.scalatest.{FunSuite, Matchers} import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ -import RLikeDrmOps._ import scala.reflect.ClassTag /** Common DRM tests to be run by all distributed engines. */ http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala b/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala index f76a3f9..60c40e8 100644 --- a/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala +++ b/spark/src/main/scala/org/apache/mahout/classifier/naivebayes/SparkNaiveBayes.scala @@ -30,7 +30,6 @@ import scala.reflect.ClassTag import scala.language.asInstanceOf import collection._ import JavaConversions._ -import org.apache.spark.SparkContext._ import org.apache.mahout.sparkbindings._ @@ -82,7 +81,7 @@ object SparkNaiveBayes extends NaiveBayes{ // todo: has to be an better way of creating this map val categoryArray = aggregatedRdd.keys.takeOrdered(aggregatedRdd.count.toInt) - for(i <- 0 until categoryArray.size){ + for(i <- categoryArray.indices){ labelIndexMap.put(categoryArray(i), categoryIndex) categoryIndex += 1 } http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala index 0aba319..ed7771d 100644 --- a/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala +++ b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala @@ -42,13 +42,13 @@ class DrmMetadata( keyW2ValFunc: ((Writable) => Any) ) = keyTypeWritable match { - case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _ - case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _ - case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _ - case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _ - case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _ - case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _ - case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _ + case cz if cz == classOf[IntWritable] => ClassTag.Int -> w2int _ + case cz if cz == classOf[LongWritable] => ClassTag.Long -> w2long _ + case cz if cz == classOf[DoubleWritable] => ClassTag.Double -> w2double _ + case cz if cz == classOf[FloatWritable] => ClassTag.Float -> w2float _ + case cz if cz == classOf[Text] => ClassTag(classOf[String]) -> w2string _ + case cz if cz == classOf[BooleanWritable] => ClassTag(classOf[Boolean]) -> w2bool _ + case cz if cz == classOf[BytesWritable] => ClassTag(classOf[Array[Byte]]) -> w2bytes _ case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}") } http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 8050ef3..3200288 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -17,25 +17,23 @@ package org.apache.mahout.sparkbindings +import org.apache.hadoop.io._ +import org.apache.mahout.common.{HDFSUtil, Hadoop1HDFSUtil} import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader import org.apache.mahout.math._ -import org.apache.mahout.math.indexeddataset.{BiDictionary, DefaultIndexedDatasetReadSchema, Schema, DefaultIndexedDatasetElementReadSchema} -import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark -import scalabindings._ -import RLikeOps._ +import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical._ -import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSparkOps, cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput} -import org.apache.mahout.math._ -import scala.reflect.ClassTag -import scala.reflect.classTag -import org.apache.spark.storage.StorageLevel +import org.apache.mahout.math.indexeddataset.{BiDictionary, DefaultIndexedDatasetElementReadSchema, DefaultIndexedDatasetReadSchema, Schema} +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ import org.apache.mahout.sparkbindings.blas._ -import org.apache.hadoop.io._ -import collection._ -import JavaConversions._ -import org.apache.mahout.math.drm._ -import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil} +import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput, cpDrmGeneric2DrmRddInput} +import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark +import org.apache.spark.storage.StorageLevel +import scala.collection.JavaConversions._ +import scala.collection._ +import scala.reflect.ClassTag /** Spark-specific non-drm-method operations */ object SparkEngine extends DistributedEngine { @@ -99,9 +97,7 @@ object SparkEngine extends DistributedEngine { /** Optional engine-specific all reduce tensor operation. */ override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = { - - import drm._ - drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf) + drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf) } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala index c35571d..ffb164c 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala @@ -78,10 +78,10 @@ object ABt { val prodNRow = operator.nrow // We are actually computing AB' here. val numProductPartitions = estimateProductPartitions(anrow = prodNRow, ancol = operator.A.ncol, - bncol = prodNCol, aparts = blocksA.partitions.size, bparts = blocksB.partitions.size) + bncol = prodNCol, aparts = blocksA.partitions.length, bparts = blocksB.partitions.length) debug( - s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}."+ + s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.length}, B #parts=${blocksB.partitions.length}."+ s"A=${operator.A.nrow}x${operator.A.ncol}, B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol." ) @@ -93,7 +93,7 @@ object ABt { var ms = traceDo(System.currentTimeMillis()) // We need to send keysB to the aggregator in order to know which columns are being updated. - val result = (keysA, keysB, (blockA %*% blockB.t)) + val result = (keysA, keysB, blockA %*% blockB.t) ms = traceDo(System.currentTimeMillis() - ms.get) trace( @@ -108,7 +108,7 @@ object ABt { val blockwiseMmulRdd = // Combine blocks pairwise. - pairwiseApply(blocksA, blocksB, mmulFunc _) + pairwiseApply(blocksA, blocksB, mmulFunc) // Now reduce proper product blocks. .combineByKey( @@ -136,7 +136,7 @@ object ABt { comb1 }, - numPartitions = blocksA.partitions.size max blocksB.partitions.size + numPartitions = blocksA.partitions.length max blocksB.partitions.length ) @@ -146,12 +146,12 @@ object ABt { // throw away A-partition # .map{case (_,tuple) => tuple} - val numPartsResult = blockifiedRdd.partitions.size + val numPartsResult = blockifiedRdd.partitions.length // See if we need to rebalance away from A granularity. if (numPartsResult * 2 < numProductPartitions || numPartsResult / 2 > numProductPartitions) { - debug(s"Will re-coalesce from ${numPartsResult} to ${numProductPartitions}") + debug(s"Will re-coalesce from $numPartsResult to $numProductPartitions") val rowRdd = deblockify(blockifiedRdd).coalesce(numPartitions = numProductPartitions) @@ -186,13 +186,13 @@ object ABt { val r = if (blockIter.hasNext) Some(part -> blockIter.next) else Option.empty[(Int, BlockifiedDrmTuple[K1])] - require(blockIter.hasNext == false, s"more than 1 (${blockIter.size + 1}) blocks per partition and A of AB'") + require(!blockIter.hasNext, s"more than 1 (${blockIter.size + 1}) blocks per partition and A of AB'") r.toIterator } // Prepare B-side. - val aParts = blocksA.partitions.size + val aParts = blocksA.partitions.length val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple ) // Perform the inner join. Let's try to do a simple thing now. @@ -241,8 +241,8 @@ object ABt { // elements per partition. TODO: do it better. // Elements per partition, bigger of two operands. - val epp = aNCol.toDouble * prodNRow / blocksA.partitions.size max aNCol.toDouble * prodNCol / - blocksB.partitions.size + val epp = aNCol.toDouble * prodNRow / blocksA.partitions.length max aNCol.toDouble * prodNCol / + blocksB.partitions.length // Number of partitions we want to converge to in the product. For now we simply extrapolate that // assuming product density and operand densities being about the same; and using the same element @@ -250,7 +250,7 @@ object ABt { val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt debug( - s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}.") + s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.length}, B #parts=${blocksB.partitions.length}.") // The plan. var blockifiedRdd: BlockifiedDrmRdd[K] = blocksA @@ -286,7 +286,7 @@ object ABt { // Accumulator is a row-wise block of sparse vectors. Since we assign to columns, // the most efficient is perhaps to create column-oriented block here. - val acc:Matrix = new SparseRowMatrix(prodNCol, rowKeys.size).t + val acc:Matrix = new SparseRowMatrix(prodNCol, rowKeys.length).t // Update accumulator using colKeys as column index indirection colKeys.view.zipWithIndex.foreach({ http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index d8637d2..309742f 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -22,7 +22,6 @@ import org.apache.mahout.math._ import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewB, OpAewScalar, TEwFunc} import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.scalabindings._ import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFunc, ReduceFuncScalar} import org.apache.mahout.sparkbindings.drm.DrmRddInput import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm} @@ -70,7 +69,7 @@ object AewB { val a = srcA.toDrmRdd() val b = srcB.toDrmRdd() - debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.") + debug(s"A${op.op}B: #partsA=${a.partitions.length},#partsB=${b.partitions.length}.") // Check if A and B are identically partitioned AND keyed. if they are, then just perform zip // instead of join, and apply the op map-side. Otherwise, perform join and apply the op @@ -92,7 +91,7 @@ object AewB { a // Full outer-join operands row-wise - .cogroup(b, numPartitions = a.partitions.size max b.partitions.size) + .cogroup(b, numPartitions = a.partitions.length max b.partitions.length) // Reduce both sides. In case there are duplicate rows in RHS or LHS, they are summed up // prior to reduction. @@ -177,7 +176,7 @@ object AewB { srcA.toBlockifiedDrmRdd(op.A.ncol) } - debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.") + debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.length}.") val rdd = aBlockRdd .map { http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala index a2c86b3..bf0f903 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala @@ -32,7 +32,7 @@ object AinCoreB { implicit val ctx:DistributedContext = rddA.context val dg = drmBroadcast(op.right.viewDiagonal()) - debug(s"operator A %*% inCoreB-diagonal. #parts=${rddA.partitions.size}.") + debug(s"operator A %*% inCoreB-diagonal. #parts=${rddA.partitions.length}.") val rdd = rddA // Just multiply the blocks @@ -47,7 +47,7 @@ object AinCoreB { val rddA = srcA.asBlockified(op.A.ncol) implicit val sc:DistributedContext = rddA.sparkContext - debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.") + debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.length}.") val bcastB = drmBroadcast(m = op.right) http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala index f7ad575..9dee9b5 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala @@ -19,7 +19,6 @@ package org.apache.mahout.sparkbindings.blas import reflect.ClassTag import collection._ -import JavaConversions._ import org.apache.mahout.logging._ import org.apache.mahout.math._ @@ -28,8 +27,6 @@ import org.apache.mahout.sparkbindings.drm._ import org.apache.spark.rdd.RDD import org.apache.mahout.math.scalabindings._ import RLikeOps._ -import org.apache.spark.SparkContext._ -import org.apache.log4j.Logger import org.apache.mahout.math.drm.logical.OpAtB import scala.collection.mutable.ArrayBuffer @@ -62,15 +59,15 @@ object AtB { // Approximate number of final partitions. We take bigger partitions as our guide to number of // elements per partition. TODO: do it better. // Elements per partition, bigger of two operands. - val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol / - rddB.partitions.size + val epp = aNRow.toDouble * prodNRow / rddA.partitions.length max aNRow.toDouble * prodNCol / + rddB.partitions.length // Number of partitions we want to converge to in the product. For now we simply extrapolate that // assuming product density and operand densities being about the same; and using the same element // per partition number in the product as the bigger of two operands. val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt - if (log.isDebugEnabled) log.debug(s"AtB: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.") + if (log.isDebugEnabled) log.debug(s"AtB: #parts $numProductPartitions for $prodNRow x $prodNCol geometry.") val zipped = if (zippable) { @@ -105,15 +102,15 @@ object AtB { // Approximate number of final partitions. We take bigger partitions as our guide to number of // elements per partition. TODO: do it better. // Elements per partition, bigger of two operands. - val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol / - rddB.partitions.size + val epp = aNRow.toDouble * prodNRow / rddA.partitions.length max aNRow.toDouble * prodNCol / + rddB.partitions.length // Number of partitions we want to converge to in the product. For now we simply extrapolate that // assuming product density and operand densities being about the same; and using the same element // per partition number in the product as the bigger of two operands. val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt min prodNRow - if (log.isDebugEnabled) log.debug(s"AtB mmul: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.") + if (log.isDebugEnabled) log.debug(s"AtB mmul: #parts $numProductPartitions for $prodNRow x $prodNCol geometry.") val zipped = if (zippable) { @@ -141,7 +138,7 @@ object AtB { // Do full join. We can't get away with partial join because it is going to lose some rows // in case we have missing rows on either side. - .cogroup(other = rddB, numPartitions = rddA.partitions.size max rddB.partitions.size ) + .cogroup(other = rddB, numPartitions = rddA.partitions.length max rddB.partitions.length ) // Merge groups. @@ -252,7 +249,7 @@ object AtB { // Produce keys .map { case (blockKey, block) â ranges(blockKey).toArray â block } - debug(s"A'B mmul #parts: ${rdd.partitions.size}.") + debug(s"A'B mmul #parts: ${rdd.partitions.length}.") rdd } @@ -311,8 +308,8 @@ object AtB { // this point we need to split n-range of B' into sutiable number of partitions. if (log.isDebugEnabled) { - log.debug(s"AtBZipped:zipped #parts ${zipped.partitions.size}") - log.debug(s"AtBZipped:Targeted #parts ${numPartitions}") + log.debug(s"AtBZipped:zipped #parts ${zipped.partitions.length}") + log.debug(s"AtBZipped:Targeted #parts $numPartitions") } // Figure out appriximately block height per partition of the result. @@ -353,7 +350,7 @@ object AtB { rowKeys -> block } - if (log.isDebugEnabled) log.debug(s"AtBZipped #parts ${rddBt.partitions.size}") + if (log.isDebugEnabled) log.debug(s"AtBZipped #parts ${rddBt.partitions.length}") rddBt } http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala index e900749..f7ba496 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala @@ -25,7 +25,6 @@ import org.apache.mahout.math._ import scalabindings._ import RLikeOps._ import org.apache.mahout.math.drm.logical.{OpCbindScalar, OpCbind} -import org.apache.spark.SparkContext._ /** Physical cbind */ object CbindAB { @@ -95,7 +94,7 @@ object CbindAB { log.debug("applying cbind as join") a - .cogroup(b, numPartitions = a.partitions.size max b.partitions.size) + .cogroup(b, numPartitions = a.partitions.length max b.partitions.length) .map { case (key, (vectorSeqA, vectorSeqB)) => http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala index 6104d83..296369a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala @@ -17,7 +17,6 @@ package org.apache.mahout.sparkbindings.blas -import scala.reflect.ClassTag import org.apache.mahout.math.scalabindings._ import RLikeOps._ import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector} http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala index 7e48ed8..49de368 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala @@ -18,10 +18,8 @@ package org.apache.mahout.sparkbindings.blas import org.apache.mahout.math.drm.logical.OpMapBlock -import org.apache.mahout.sparkbindings.drm.DrmRddInput -import org.apache.mahout.math.drm.BlockMapFunc import org.apache.mahout.math.scalabindings.RLikeOps._ -import scala.reflect.ClassTag +import org.apache.mahout.sparkbindings.drm.DrmRddInput object MapBlock { http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala index 417fe24..d9d5037 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala @@ -1,14 +1,11 @@ package org.apache.mahout.sparkbindings.blas +import org.apache.mahout.logging._ +import org.apache.mahout.math.drm.logical.OpPar import org.apache.mahout.sparkbindings.drm - -import scala.reflect.ClassTag import org.apache.mahout.sparkbindings.drm.DrmRddInput -import org.apache.mahout.math.drm.logical.OpPar -import org.apache.spark.rdd.RDD -import scala.math._ -import org.apache.mahout.logging._ +import scala.math._ /** Physical adjustment of parallelism */ object Par { @@ -20,8 +17,8 @@ object Par { implicit val ktag = op.keyClassTag val srcBlockified = src.isBlockified - val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise() - val srcNParts = srcRdd.partitions.size + val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd() + val srcNParts = srcRdd.partitions.length // To what size? val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits @@ -37,17 +34,16 @@ object Par { if (targetParts > srcNParts) { // Expanding. Always requires deblockified stuff. May require re-shuffling. - val rdd = src.asRowWise().repartition(numPartitions = targetParts) - + val rdd = src.toDrmRdd().repartition(numPartitions = targetParts) rdd } else if (targetParts < srcNParts) { // Shrinking. if (srcBlockified) { - drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts)) + drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts)) } else { - src.asRowWise().coalesce(numPartitions = targetParts) + src.toDrmRdd().coalesce(numPartitions = targetParts) } } else { // no adjustment required. http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala index d12a0d3..9bf47bd 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala @@ -18,7 +18,6 @@ package org.apache.mahout.sparkbindings.blas import org.apache.log4j.Logger -import scala.reflect.ClassTag import org.apache.mahout.sparkbindings.drm.DrmRddInput import org.apache.mahout.math.drm.logical.OpRbind http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 5a83f80..8c4eef2 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -17,17 +17,14 @@ package org.apache.mahout.sparkbindings -import org.apache.mahout.sparkbindings -import org.apache.spark.rdd.RDD - -import scala.reflect.ClassTag -import org.apache.spark.SparkContext._ import org.apache.mahout.math._ import org.apache.mahout.math.drm._ -import scalabindings._ -import RLikeOps._ -import collection._ -import JavaConversions._ +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import org.apache.spark.rdd.RDD + +import scala.collection.JavaConversions._ +import scala.collection._ /** * This validation contains distributed algorithms that distributed matrix expression optimizer picks @@ -60,7 +57,7 @@ package object blas { .collect() // Starting indices - var startInd = new Array[Int](rdd.partitions.size) + var startInd = new Array[Int](rdd.partitions.length) // Save counts for (pc <- partSizes) startInd(pc._1) = pc._2 @@ -123,7 +120,7 @@ package object blas { sc // Bootstrap full key set - .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1) + .parallelize(0 until dueRows, numSlices = rdd.partitions.length max 1) // Enable PairedFunctions .map(_ -> Unit) @@ -137,7 +134,7 @@ package object blas { // Coalesce and output RHS .map { case (key, (seqUnit, seqVec)) => val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols)) - val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc + val vec = if (seqVec.nonEmpty) (acc /: seqVec.tail)(_ + _) else acc key -> vec } http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index e369cf7..bd95fe0 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -22,12 +22,11 @@ import org.apache.mahout.math.drm.CacheHint.CacheHint import math._ import scalabindings._ import RLikeOps._ -import drm._ import scala.collection.JavaConversions._ import org.apache.spark.storage.StorageLevel import reflect._ import scala.util.Random -import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} +import org.apache.hadoop.io.{LongWritable, Text, IntWritable} import org.apache.mahout.math.drm._ import org.apache.mahout.sparkbindings._ @@ -40,7 +39,6 @@ import org.apache.mahout.sparkbindings._ * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows * (will require a lazy fix for some physical operations. - * @param evidence$1 class tag context bound for K. * @tparam K matrix key type (e.g. the keys of sequence files once persisted) */ class CheckpointedDrmSpark[K: ClassTag]( @@ -187,7 +185,7 @@ class CheckpointedDrmSpark[K: ClassTag]( // that nrow can be computed lazily, which always happens when rdd is already available, cached, // and it's ok to compute small summaries without triggering huge pipelines. Which usually // happens right after things like drmFromHDFS or drmWrap(). - val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L + val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max) + 1L val rowCount = rdd.count() _canHaveMissingRows = maxPlus1 != rowCount || rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong @@ -202,8 +200,8 @@ class CheckpointedDrmSpark[K: ClassTag]( protected def computeNCol = { rddInput.isBlockified match { case true â rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached")) - .map(_._2.ncol).reduce(max(_, _)) - case false â cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max(_, _)) + .map(_._2.ncol).reduce(max) + case false â cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala index 3c086fe..60dd850 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala @@ -2,7 +2,6 @@ package org.apache.mahout.sparkbindings.drm import org.apache.mahout.math.drm.CheckpointedDrm import org.apache.mahout.sparkbindings.DrmRdd -import scala.reflect.ClassTag /** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */ class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) { http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala index b793098..64065d9 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/package.scala @@ -17,25 +17,19 @@ package org.apache.mahout.sparkbindings -import org.apache.mahout.math._ -import org.apache.spark.SparkContext -import scala.collection.JavaConversions._ -import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} import org.apache.log4j.Logger -import java.lang.Math -import org.apache.spark.rdd.RDD -import scala.reflect.ClassTag +import org.apache.mahout.math +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.scalabindings._ -import RLikeOps._ import org.apache.spark.broadcast.Broadcast -import org.apache.mahout.math.drm._ -import SparkContext._ -import org.apache.mahout.math +import scala.reflect.ClassTag package object drm { - private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings"); + private[drm] final val log = Logger.getLogger("org.apache.mahout.sparkbindings") private[sparkbindings] implicit def cpDrm2DrmRddInput[K](cp: CheckpointedDrmSpark[K]): DrmRddInput[K] = cp.rddInput @@ -67,15 +61,15 @@ package object drm { val vectors = data.map(t => t._2).toArray val block = if (vectors(0).isDense) { - val block = new DenseMatrix(vectors.size, blockncol) + val block = new DenseMatrix(vectors.length, blockncol) var row = 0 - while (row < vectors.size) { + while (row < vectors.length) { block(row, ::) := vectors(row) row += 1 } block } else { - new SparseRowMatrix(vectors.size, blockncol, vectors, true, false) + new SparseRowMatrix(vectors.length, blockncol, vectors, true, false) } Iterator(keys -> block) @@ -99,7 +93,7 @@ package object drm { rdd.flatMap { case (blockKeys: Array[K], block: Matrix) => - blockKeys.ensuring(blockKeys.size == block.nrow) + blockKeys.ensuring(blockKeys.length == block.nrow) blockKeys.view.zipWithIndex.map { case (key, idx) => val v = block(idx, ::) // This is just a view! http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 5330581..acca75e 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -32,8 +32,6 @@ import collection._ import collection.generic.Growable import scala.reflect.ClassTag - - /** Public api for Spark-specific operators */ package object sparkbindings { @@ -191,7 +189,7 @@ package object sparkbindings { "defined?") val j = cp.split(File.pathSeparatorChar) - if (j.size > 10) { + if (j.length > 10) { // assume this is a valid classpath line jars ++= j continue = false http://git-wip-us.apache.org/repos/asf/mahout/blob/a168d238/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala index 7241660..3af9af7 100644 --- a/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala +++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOpsSuite.scala @@ -17,14 +17,12 @@ package org.apache.mahout.sparkbindings.drm -import org.apache.mahout.math._ -import scalabindings._ -import drm._ -import RLikeOps._ -import RLikeDrmOps._ +import org.apache.mahout.math.drm.RLikeDrmOps._ +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.scalabindings._ import org.apache.mahout.sparkbindings._ -import org.scalatest.{ConfigMap, BeforeAndAfterAllConfigMap, FunSuite} import org.apache.mahout.sparkbindings.test.DistributedSparkSuite +import org.scalatest.FunSuite import scala.reflect.ClassTag @@ -35,19 +33,19 @@ class DrmLikeOpsSuite extends FunSuite with DistributedSparkSuite with DrmLikeOp val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6)) val A = drmParallelize(m = inCoreA, numPartitions = 2) - A.rdd.partitions.size should equal(2) + A.rdd.partitions.length should equal(2) - (A + 1.0).par(exact = 4).rdd.partitions.size should equal(4) - A.par(exact = 2).rdd.partitions.size should equal(2) - A.par(exact = 1).rdd.partitions.size should equal(1) + (A + 1.0).par(exact = 4).rdd.partitions.length should equal(4) + A.par(exact = 2).rdd.partitions.length should equal(2) + A.par(exact = 1).rdd.partitions.length should equal(1) - A.par(min = 4).rdd.partitions.size should equal(4) - A.par(min = 2).rdd.partitions.size should equal(2) - A.par(min = 1).rdd.partitions.size should equal(2) - A.par(auto = true).rdd.partitions.size should equal(10) - A.par(exact = 10).par(auto = true).rdd.partitions.size should equal(10) - A.par(exact = 11).par(auto = true).rdd.partitions.size should equal(19) - A.par(exact = 20).par(auto = true).rdd.partitions.size should equal(19) + A.par(min = 4).rdd.partitions.length should equal(4) + A.par(min = 2).rdd.partitions.length should equal(2) + A.par(min = 1).rdd.partitions.length should equal(2) + A.par(auto = true).rdd.partitions.length should equal(10) + A.par(exact = 10).par(auto = true).rdd.partitions.length should equal(10) + A.par(exact = 11).par(auto = true).rdd.partitions.length should equal(19) + A.par(exact = 20).par(auto = true).rdd.partitions.length should equal(19) A.keyClassTag shouldBe ClassTag.Int A.par(auto = true).keyClassTag shouldBe ClassTag.Int
