WIP, Mahout-Flink Integration, adding missing methods; code refactoring
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/38d08085 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/38d08085 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/38d08085 Branch: refs/heads/flink-binding Commit: 38d0808523800a4369b18251e58b04d61771baf5 Parents: 0c6351f Author: smarthi <[email protected]> Authored: Mon Oct 26 20:59:42 2015 -0700 Committer: smarthi <[email protected]> Committed: Mon Oct 26 20:59:42 2015 -0700 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 18 +++++++- .../drm/CheckpointedFlinkDrmOps.scala | 35 +++++++++++++++ .../mahout/flinkbindings/drm/FlinkDrm.scala | 20 +++++---- .../apache/mahout/flinkbindings/package.scala | 45 +++++++++----------- .../drm/CheckpointedDrmSpark.scala | 25 ++++++----- .../drm/CheckpointedDrmSparkOps.scala | 19 +++++++++ .../apache/mahout/sparkbindings/package.scala | 4 +- 7 files changed, 116 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 5915c0a..0bc12aa 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -20,6 +20,8 @@ package org.apache.mahout.flinkbindings import java.util.Collection +import org.apache.flink.api.java.utils.DataSetUtils + import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -64,9 +66,10 @@ object FlinkEngine extends DistributedEngine { implicit val env = dc.asInstanceOf[FlinkDistributedContext].env val metadata = hdfsUtils.readDrmHeader(path) + println(metadata) val unwrapKey = metadata.unwrapKeyFunction - + println(unwrapKey) val dataset = env.readHadoopFile(new SequenceFileInputFormat[Writable, VectorWritable], classOf[Writable], classOf[VectorWritable], path) @@ -221,7 +224,9 @@ object FlinkEngine extends DistributedEngine { /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */ override def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[Int] = { + val parallelDrm = parallelize(m, numPartitions) + new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols()) } @@ -276,6 +281,17 @@ object FlinkEngine extends DistributedEngine { def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = ??? +// def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples:Int, replacement: Boolean = false): Matrix = { +// +// val ncol = drmX match { +// case cp: CheckpointedFlinkDrm[K] â cp.ncol +// case _ â -1 +// } +// +// val sample = DataSetUtils.sampleWithSize(drmX.dataset, replacement, numSamples) +// +// } + /** Optional engine-specific all reduce tensor operation. */ def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix = throw new UnsupportedOperationException("the operation allreduceBlock is not yet supported on Flink") http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala new file mode 100644 index 0000000..a037d44 --- /dev/null +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrmOps.scala @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.mahout.flinkbindings.drm + +import org.apache.mahout.math.drm.CheckpointedDrm + +import scala.reflect.ClassTag + +class CheckpointedFlinkDrmOps[K: ClassTag](drm: CheckpointedDrm[K]) { + assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "must be a Flink-backed matrix") + + private[flinkbindings] val flinkDrm = drm.asInstanceOf[CheckpointedFlinkDrm[K]] + + /** Flink matrix customization exposure */ + def dataset = flinkDrm.ds + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala index 4a16724..dbc6b11 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala @@ -65,15 +65,17 @@ class RowsFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val ncol: Int) extends Fl val it = values.asScala.seq val (keys, vectors) = it.unzip - val isDense = vectors.head.isDense - - if (isDense) { - val matrix = new DenseMatrix(vectors.size, ncolLocal) - vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec } - out.collect((keys.toArray(classTag), matrix)) - } else { - val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray) - out.collect((keys.toArray(classTag), matrix)) + if (vectors.nonEmpty) { + val isDense = vectors.head.isDense + + if (isDense) { + val matrix = new DenseMatrix(vectors.size, ncolLocal) + vectors.zipWithIndex.foreach { case (vec, idx) => matrix(idx, ::) := vec } + out.collect((keys.toArray(classTag), matrix)) + } else { + val matrix = new SparseRowMatrix(vectors.size, ncolLocal, vectors.toArray) + out.collect((keys.toArray(classTag), matrix)) + } } } }) http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index c77a551..656b8de 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -18,34 +18,19 @@ */ package org.apache.mahout +import org.apache.flink.api.common.functions.{FilterFunction, MapFunction} +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm} +import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable} +import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _} +import org.slf4j.LoggerFactory + import scala.Array._ import scala.reflect.ClassTag -import org.apache.flink.api.common.functions.FilterFunction -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.ExecutionEnvironment -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.FlinkDistributedContext -import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm -import org.apache.mahout.math._ -import org.apache.mahout.math.DenseVector -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.MatrixWritable -import org.apache.mahout.math.Vector -import org.apache.mahout.math.VectorWritable -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.BlockifiedDrmTuple -import org.apache.mahout.math.drm.CheckpointedDrm -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.math.drm.DrmTuple -import org.slf4j.LoggerFactory -import org.apache.mahout.math.drm.logical.CheckpointAction package object flinkbindings { - private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbingings") + private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings") /** Row-wise organized DRM dataset type */ type DrmDataSet[K] = DataSet[DrmTuple[K]] @@ -64,18 +49,28 @@ package object flinkbindings { implicit def wrapContext(env: ExecutionEnvironment): FlinkDistributedContext = new FlinkDistributedContext(env) + implicit def unwrapContext(ctx: FlinkDistributedContext): ExecutionEnvironment = ctx.env - private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrm[K] = { + private[flinkbindings] implicit def castCheckpointedDrm[K: ClassTag](drm: CheckpointedDrm[K]) + : CheckpointedFlinkDrm[K] = { + assert(drm.isInstanceOf[CheckpointedFlinkDrm[K]], "it must be a Flink-backed matrix") drm.asInstanceOf[CheckpointedFlinkDrm[K]] } - implicit def checkpointeDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = { + implicit def checkpointedDrmToFlinkDrm[K: ClassTag](cp: CheckpointedDrm[K]): FlinkDrm[K] = { val flinkDrm = castCheckpointedDrm(cp) new RowsFlinkDrm[K](flinkDrm.ds, flinkDrm.ncol) } + /** Adding Spark-specific ops */ + implicit def cpDrm2cpDrmFlinkOps[K: ClassTag](drm: CheckpointedDrm[K]): CheckpointedFlinkDrmOps[K] = + new CheckpointedFlinkDrmOps[K](drm) + + implicit def drm2cpDrmFlinkOps[K: ClassTag](drm: DrmLike[K]): CheckpointedFlinkDrmOps[K] = drm: CheckpointedDrm[K] + + private[flinkbindings] implicit def wrapAsWritable(m: Matrix): MatrixWritable = new MatrixWritable(m) private[flinkbindings] implicit def wrapAsWritable(v: Vector): VectorWritable = new VectorWritable(v) private[flinkbindings] implicit def unwrapFromWritable(w: MatrixWritable): Matrix = w.get() http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala index 38007e0..857cca0 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala @@ -17,18 +17,18 @@ package org.apache.mahout.sparkbindings.drm +import org.apache.hadoop.io.{IntWritable, LongWritable, Text} import org.apache.mahout.math._ -import math._ -import scalabindings._ -import RLikeOps._ -import drm._ -import scala.collection.JavaConversions._ -import org.apache.spark.storage.StorageLevel -import reflect._ -import scala.util.Random -import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable} import org.apache.mahout.math.drm._ +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ import org.apache.mahout.sparkbindings._ +import org.apache.spark.storage.StorageLevel + +import scala.collection.JavaConversions._ +import scala.math._ +import scala.reflect._ +import scala.util.Random /** ==Spark-specific optimizer-checkpointed DRM.== * @@ -39,7 +39,6 @@ import org.apache.mahout.sparkbindings._ * @param partitioningTag unique partitioning tag. Used to detect identically partitioned operands. * @param _canHaveMissingRows true if the matrix is int-keyed, and if it also may have missing rows * (will require a lazy fix for some physical operations. - * @param evidence$1 class tag context bound for K. * @tparam K matrix key type (e.g. the keys of sequence files once persisted) */ class CheckpointedDrmSpark[K: ClassTag]( @@ -182,7 +181,7 @@ class CheckpointedDrmSpark[K: ClassTag]( // that nrow can be computed lazily, which always happens when rdd is already available, cached, // and it's ok to compute small summaries without triggering huge pipelines. Which usually // happens right after things like drmFromHDFS or drmWrap(). - val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L + val maxPlus1 = rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max) + 1L val rowCount = rdd.count() _canHaveMissingRows = maxPlus1 != rowCount || rdd.map(_._1).sum().toLong != (rowCount * (rowCount - 1.0) / 2.0).toLong @@ -197,8 +196,8 @@ class CheckpointedDrmSpark[K: ClassTag]( protected def computeNCol = { rddInput.isBlockified match { case true â rddInput.asBlockified(throw new AssertionError("not reached")) - .map(_._2.ncol).reduce(max(_, _)) - case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max(_, _)) + .map(_._2.ncol).reduce(max) + case false â cache().rddInput.asRowWise().map(_._2.length).fold(-1)(max) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala index 25953e1..0a1757a 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala @@ -1,3 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.mahout.sparkbindings.drm import org.apache.mahout.math.drm.CheckpointedDrm http://git-wip-us.apache.org/repos/asf/mahout/blob/38d08085/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala index 330ae38..91ad47d 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/package.scala @@ -182,7 +182,7 @@ package object sparkbindings { val w = new StringWriter() closeables += w - var continue = true; + var continue = true val jars = new mutable.ArrayBuffer[String]() do { val cp = r.readLine() @@ -230,7 +230,7 @@ package object sparkbindings { if (!part1Req) warn("blockified rdd: condition not met: exactly 1 per partition") - return part1Req + part1Req } }
