Repository: mahout Updated Branches: refs/heads/flink-binding a168d238d -> 92a2f6c8f
Merge changes from master Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/92a2f6c8 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/92a2f6c8 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/92a2f6c8 Branch: refs/heads/flink-binding Commit: 92a2f6c8ffa02478e7cc8a4b79ff270bf4d08c8a Parents: a168d23 Author: Dmitriy Lyubimov <[email protected]> Authored: Sat Oct 24 20:26:56 2015 -0700 Committer: smarthi <[email protected]> Committed: Sat Mar 12 22:20:29 2016 -0500 ---------------------------------------------------------------------- flink/pom.xml | 4 ++-- .../apache/mahout/sparkbindings/SparkEngine.scala | 2 +- .../apache/mahout/sparkbindings/blas/AewB.scala | 12 ++++++------ .../mahout/sparkbindings/blas/CbindAB.scala | 6 +++--- .../mahout/sparkbindings/blas/MapBlock.scala | 2 +- .../apache/mahout/sparkbindings/blas/Par.scala | 9 +++++---- .../sparkbindings/drm/CheckpointedDrmSpark.scala | 18 +++++++++--------- .../drm/CheckpointedDrmSparkOps.scala | 2 +- 8 files changed, 28 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/flink/pom.xml ---------------------------------------------------------------------- diff --git a/flink/pom.xml b/flink/pom.xml index 1aafba5..37f1dbf 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -110,7 +110,7 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala</artifactId> + <artifactId>flink-scala_2.10</artifactId> <version>${flink.version}</version> </dependency> <dependency> @@ -120,7 +120,7 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> + <artifactId>flink-clients_2.10</artifactId> <version>${flink.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala index 3200288..b89235d 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala @@ -97,7 +97,7 @@ object SparkEngine extends DistributedEngine { /** Optional engine-specific all reduce tensor operation. */ override def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = { - drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf) + drm.asBlockified(ncol = drm.ncol).map(bmf(_)).reduce(rf) } /** http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala index 309742f..92c429f 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala @@ -66,8 +66,8 @@ object AewB { case default => throw new IllegalArgumentException("Unsupported elementwise operator:%s.".format(opId)) } - val a = srcA.toDrmRdd() - val b = srcB.toDrmRdd() + val a = srcA.asRowWise() + val b = srcB.asRowWise() debug(s"A${op.op}B: #partsA=${a.partitions.length},#partsB=${b.partitions.length}.") @@ -120,10 +120,10 @@ object AewB { // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) { - val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { - srcA.toBlockifiedDrmRdd(op.A.ncol) + srcA.asBlockified(op.A.ncol) } val rdd = aBlockRdd.map {case (keys, block) => @@ -170,10 +170,10 @@ object AewB { // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar. val aBlockRdd = if (classTag[K] == ClassTag.Int && op.A.canHaveMissingRows) { - val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]]) + val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.asRowWise().asInstanceOf[DrmRdd[Int]]) drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]] } else { - srcA.toBlockifiedDrmRdd(op.A.ncol) + srcA.asBlockified(op.A.ncol) } debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.length}.") http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala index f7ba496..9f34b06 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala @@ -34,7 +34,7 @@ object CbindAB { def cbindAScalar[K](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = { implicit val ktag = op.keyClassTag - val srcRdd = srcA.toDrmRdd() + val srcRdd = srcA.asRowWise() val ncol = op.A.ncol val x = op.x @@ -63,8 +63,8 @@ object CbindAB { def cbindAB_nograph[K](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = { - val a = srcA.toDrmRdd() - val b = srcB.toDrmRdd() + val a = srcA.asRowWise() + val b = srcB.asRowWise() val n = op.ncol val n1 = op.A.ncol val n2 = n - n1 http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala index 49de368..1caa537 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala @@ -30,7 +30,7 @@ object MapBlock { val bmf = operator.bmf val ncol = operator.ncol implicit val rtag = operator.keyClassTag - src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => { + src.asBlockified(operator.A.ncol).map(blockTuple => { val out = bmf(blockTuple) assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.") http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala index d9d5037..974c8db 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala @@ -17,7 +17,7 @@ object Par { implicit val ktag = op.keyClassTag val srcBlockified = src.isBlockified - val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd() + val srcRdd = if (srcBlockified) src.asBlockified(op.ncol) else src.asRowWise() val srcNParts = srcRdd.partitions.length // To what size? @@ -34,16 +34,17 @@ object Par { if (targetParts > srcNParts) { // Expanding. Always requires deblockified stuff. May require re-shuffling. - val rdd = src.toDrmRdd().repartition(numPartitions = targetParts) + val rdd = src.asRowWise().repartition(numPartitions = targetParts) + rdd } else if (targetParts < srcNParts) { // Shrinking. if (srcBlockified) { - drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts)) + drm.rbind(src.asBlockified(op.ncol).coalesce(numPartitions = targetParts)) } else { - src.toDrmRdd().coalesce(numPartitions = targetParts) + src.asRowWise().coalesce(numPartitions = targetParts) } } else { // no adjustment required. http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index bd95fe0..71755c5 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -125,7 +125,7 @@ class CheckpointedDrmSpark[K: ClassTag]( // since currently spark #collect() requires Serializeable support, // we serialize DRM vectors into byte arrays on backend and restore Vector // instances on the front end: - val data = rddInput.toDrmRdd().map(t => (t._1, t._2)).collect() + val data = rddInput.asRowWise().map(t => (t._1, t._2)).collect() val m = if (data.forall(_._2.isDense)) @@ -162,13 +162,13 @@ class CheckpointedDrmSpark[K: ClassTag]( // Map backing RDD[(K,Vector)] to RDD[(K)Writable,VectorWritable)] and save. if (ktag.runtimeClass == classOf[Int]) { - rddInput.toDrmRdd() + rddInput.asRowWise() .map( x => (new IntWritable(x._1.asInstanceOf[Int]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[String]){ - rddInput.toDrmRdd() + rddInput.asRowWise() .map( x => (new Text(x._1.asInstanceOf[String]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else if (ktag.runtimeClass == classOf[Long]) { - rddInput.toDrmRdd() + rddInput.asRowWise() .map( x => (new LongWritable(x._1.asInstanceOf[Long]), new VectorWritable(x._2))).saveAsSequenceFile(path) } else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag)) @@ -179,7 +179,7 @@ class CheckpointedDrmSpark[K: ClassTag]( val intRowIndex = classTag[K] == classTag[Int] if (intRowIndex) { - val rdd = cache().rddInput.toDrmRdd().asInstanceOf[DrmRdd[Int]] + val rdd = cache().rddInput.asRowWise().asInstanceOf[DrmRdd[Int]] // I guess it is a suitable place to compute int keys consistency test here because we know // that nrow can be computed lazily, which always happens when rdd is already available, cached, @@ -192,21 +192,21 @@ class CheckpointedDrmSpark[K: ClassTag]( intFixExtra = (maxPlus1 - rowCount) max 0L maxPlus1 } else - cache().rddInput.toDrmRdd().count() + cache().rddInput.asRowWise().count() } protected def computeNCol = { rddInput.isBlockified match { - case true â rddInput.toBlockifiedDrmRdd(throw new AssertionError("not reached")) + case true â rddInput.asBlockified(throw new AssertionError("not reached")) .map(_._2.ncol).reduce(max) - case false â cache().rddInput.toDrmRdd().map(_._2.length).fold(-1)(max) + case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _)) } } protected def computeNNonZero = - cache().rddInput.toDrmRdd().map(_._2.getNumNonZeroElements.toLong).sum().toLong + cache().rddInput.asRowWise().map(_._2.getNumNonZeroElements.toLong).sum().toLong /** Changes the number of rows in the DRM without actually touching the underlying data. Used to * redimension a DRM after it has been created, which implies some blank, non-existent rows. http://git-wip-us.apache.org/repos/asf/mahout/blob/92a2f6c8/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala index 60dd850..e745a24 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala @@ -11,6 +11,6 @@ class CheckpointedDrmSparkOps[K](drm: CheckpointedDrm[K]) { private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]] /** Spark matrix customization exposure */ - def rdd:DrmRdd[K] = sparkDrm.rddInput.toDrmRdd() + def rdd = sparkDrm.rddInput.asRowWise() }
