Repository: mahout Updated Branches: refs/heads/flink-binding 54f51de82 -> af015ece7
WIP, migrating to Flink 0.10 and the Flink Scala API Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/e2ab67f2 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/e2ab67f2 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/e2ab67f2 Branch: refs/heads/flink-binding Commit: e2ab67f2629a6c49c4d8e911274b409c2b57101c Parents: 54f51de Author: smarthi <[email protected]> Authored: Tue Nov 10 19:51:20 2015 -0500 Committer: smarthi <[email protected]> Committed: Tue Nov 10 19:51:20 2015 -0500 ---------------------------------------------------------------------- .../mahout/flinkbindings/DataSetOps.scala | 89 +++++++++----------- .../mahout/flinkbindings/FlinkByteBCast.scala | 4 +- .../flinkbindings/FlinkDistributedContext.scala | 2 +- .../mahout/flinkbindings/FlinkEngine.scala | 32 +++---- .../mahout/flinkbindings/blas/FlinkOpAewB.scala | 8 +- .../flinkbindings/blas/FlinkOpAewScalar.scala | 16 ++-- .../mahout/flinkbindings/blas/FlinkOpAt.scala | 10 +-- .../mahout/flinkbindings/blas/FlinkOpAtA.scala | 6 +- .../mahout/flinkbindings/blas/FlinkOpAtB.scala | 10 +-- .../mahout/flinkbindings/blas/FlinkOpAx.scala | 15 ++-- .../flinkbindings/blas/FlinkOpCBind.scala | 11 +-- .../flinkbindings/blas/FlinkOpRBind.scala | 2 +- .../blas/FlinkOpTimesRightMatrix.scala | 4 +- .../drm/CheckpointedFlinkDrm.scala | 45 ++++------ .../mahout/flinkbindings/drm/FlinkDrm.scala | 24 ++---- .../apache/mahout/flinkbindings/package.scala | 13 +-- .../flinkbindings/DistributedFlinkSuite.scala | 3 +- .../flinkbindings/examples/ReadCsvExample.scala | 2 +- 18 files changed, 129 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala index 4f437ae..2387d4b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/DataSetOps.scala @@ -18,61 +18,50 @@ */ package org.apache.mahout.flinkbindings -import java.lang.Iterable -import java.util.Collections -import java.util.Comparator -import scala.collection.JavaConverters._ -import org.apache.flink.util.Collector -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.flink.api.common.functions.RichMapPartitionFunction -import org.apache.flink.configuration.Configuration -import scala.reflect.ClassTag - - -class DataSetOps[K: ClassTag](val ds: DataSet[K]) { +//@Deprecated +//class DataSetOps[K: ClassTag](val ds: DataSet[K]) { /** * Implementation taken from http://stackoverflow.com/questions/30596556/zipwithindex-on-apache-flink * * TODO: remove when FLINK-2152 is committed and released */ - def zipWithIndex(): DataSet[(Int, K)] = { - - // first for each partition count the number of elements - to calculate the offsets - val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] { - override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit = { - val cnt: Int = values.asScala.count(_ => true) - val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask - out.collect((subtaskIdx, cnt)) - } - }) +// def zipWithIndex(): DataSet[(Int, K)] = { +// +// first for each partition count the number of elements - to calculate the offsets +// val counts = ds.mapPartition(new RichMapPartitionFunction[K, (Int, Int)] { +// override def mapPartition(values: Iterable[K], out: Collector[(Int, Int)]): Unit = { +// val cnt: Int = values.asScala.count(_ => true) +// val subtaskIdx = getRuntimeContext.getIndexOfThisSubtask +// out.collect((subtaskIdx, cnt)) +// } +// }) // then use the offsets to index items of each partition - val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] { - var offset: Int = 0 - - override def open(parameters: Configuration): Unit = { - val offsetsJava: java.util.List[(Int, Int)] = - getRuntimeContext.getBroadcastVariable("counts") - val offsets = offsetsJava.asScala - - val sortedOffsets = - offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt } - - val subtaskId = getRuntimeContext.getIndexOfThisSubtask - offset = sortedOffsets.take(subtaskId).sum.toInt - } - - override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit = { - val it = values.asScala - it.zipWithIndex.foreach { case (value, idx) => - out.collect((idx + offset, value)) - } - } - }).withBroadcastSet(counts, "counts"); - - zipped - } - -} \ No newline at end of file +// val zipped = ds.mapPartition(new RichMapPartitionFunction[K, (Int, K)] { +// var offset: Int = 0 +// +// override def open(parameters: Configuration): Unit = { +// val offsetsJava: java.util.List[(Int, Int)] = +// getRuntimeContext.getBroadcastVariable("counts") +// val offsets = offsetsJava.asScala +// +// val sortedOffsets = +// offsets sortBy { case (id, _) => id } map { case (_, cnt) => cnt } +// +// val subtaskId = getRuntimeContext.getIndexOfThisSubtask +// offset = sortedOffsets.take(subtaskId).sum +// } +// +// override def mapPartition(values: Iterable[K], out: Collector[(Int, K)]): Unit = { +// val it = values.asScala +// it.zipWithIndex.foreach { case (value, idx) => +// out.collect((idx + offset, value)) +// } +// } +// }).withBroadcastSet(counts, "counts") +// +// zipped +// } +// +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala index 1024452..8544db0 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkByteBCast.scala @@ -72,7 +72,7 @@ object FlinkByteBCast { dataOutput.writeInt(StreamTypeVector) writeable.write(dataOutput) val array = dataOutput.toByteArray() - return new FlinkByteBCast[Vector](array) + new FlinkByteBCast[Vector](array) } def wrap(m: Matrix): FlinkByteBCast[Matrix] = { @@ -81,7 +81,7 @@ object FlinkByteBCast { dataOutput.writeInt(StreamTypeMatrix) writeable.write(dataOutput) val array = dataOutput.toByteArray() - return new FlinkByteBCast[Matrix](array) + new FlinkByteBCast[Matrix](array) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala index ebe473f..c818030 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkDistributedContext.scala @@ -18,7 +18,7 @@ */ package org.apache.mahout.flinkbindings -import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.mahout.math.drm.DistributedContext import org.apache.mahout.math.drm.DistributedEngine http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 6b12d11..d03aef7 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -18,17 +18,12 @@ */ package org.apache.mahout.flinkbindings -import java.util.Collection - -import scala.collection.JavaConverters._ import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred.SequenceFileInputFormat import org.apache.mahout.flinkbindings.blas._ import org.apache.mahout.flinkbindings.drm._ import org.apache.mahout.flinkbindings.io.HDFSUtil @@ -42,6 +37,8 @@ import org.apache.mahout.math.indexeddataset.Schema import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.flink.api.scala._ + object FlinkEngine extends DistributedEngine { @@ -65,13 +62,13 @@ object FlinkEngine extends DistributedEngine { val metadata = hdfsUtils.readDrmHeader(path) - val unwrapKey = metadata.unwrapKeyFunction + val unwrapKey = metadata.unwrapKeyFunction - val dataset = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path) + val ds = env.readSequenceFile(classOf[Writable], classOf[VectorWritable], path) - val res = dataset.map(new MapFunction[Tuple2[Writable, VectorWritable], (Any, Vector)] { - def map(tuple: Tuple2[Writable, VectorWritable]): (Any, Vector) = { - (unwrapKey(tuple.f0), tuple.f1) + val res = ds.map(new MapFunction[(Writable, VectorWritable), (Any, Vector)] { + def map(tuple: (Writable, VectorWritable)): (Any, Vector) = { + (unwrapKey(tuple._1), tuple._2) } }) @@ -159,7 +156,7 @@ object FlinkEngine extends DistributedEngine { def reduce(v1: Vector, v2: Vector) = v1 + v2 }) - val list = sum.collect.asScala.toList + val list = sum.collect list.head } @@ -180,7 +177,7 @@ object FlinkEngine extends DistributedEngine { def reduce(v1: Vector, v2: Vector) = v1 + v2 }) - val list = result.collect.asScala.toList + val list = result.collect list.head } @@ -203,7 +200,7 @@ object FlinkEngine extends DistributedEngine { def reduce(v1: Double, v2: Double) = v1 + v2 }) - val list = sumOfSquares.collect.asScala.toList + val list = sumOfSquares.collect list.head } @@ -229,10 +226,8 @@ object FlinkEngine extends DistributedEngine { private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) (implicit dc: DistributedContext): DrmDataSet[Int] = { val rows = (0 until m.nrow).map(i => (i, m(i, ::))) - val rowsJava: Collection[DrmTuple[Int]] = rows.asJava - val dataSetType = TypeExtractor.getForObject(rows.head) - dc.env.fromCollection(rowsJava, dataSetType).setParallelism(parallelismDegree) + dc.env.fromCollection(rows).setParallelism(parallelismDegree) } /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ @@ -251,10 +246,7 @@ object FlinkEngine extends DistributedEngine { for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) } - - val dataSetType = TypeExtractor.getForObject(nonParallelResult.head) - val result = dc.env.fromCollection(nonParallelResult.asJava, dataSetType) - + val result = dc.env.fromCollection(nonParallelResult) new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala index 38fe312..f879e86 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewB.scala @@ -6,7 +6,7 @@ import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.flink.api.common.functions.CoGroupFunction -import org.apache.flink.api.java.DataSet +import org.apache.flink.api.scala.DataSet import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm.FlinkDrm @@ -40,13 +40,13 @@ object FlinkOpAewB { val it1 = Lists.newArrayList(it1java).asScala val it2 = Lists.newArrayList(it2java).asScala - if (!it1.isEmpty && !it2.isEmpty) { + if (it1.nonEmpty && it2.nonEmpty) { val (idx, a) = it1.head val (_, b) = it2.head out.collect((idx, function(a, b))) - } else if (it1.isEmpty && !it2.isEmpty) { + } else if (it1.isEmpty && it2.nonEmpty) { out.collect(it2.head) - } else if (!it1.isEmpty && it2.isEmpty) { + } else if (it1.nonEmpty && it2.isEmpty) { out.collect(it1.head) } } http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala index ab434bb..67d710b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAewScalar.scala @@ -18,18 +18,16 @@ */ package org.apache.mahout.flinkbindings.blas -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag import org.apache.flink.api.common.functions.MapFunction -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} import org.apache.mahout.math.Matrix -import org.apache.mahout.math.drm.logical.OpAewScalar -import org.apache.mahout.math.drm.logical.OpAewUnaryFunc -import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, OpAewScalar, TEwFunc} import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.math.drm.logical.AbstractUnaryOp -import org.apache.mahout.math.drm.logical.TEwFunc + +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + +import org.apache.flink.api.scala._ /** * Implementation is inspired by Spark-binding's OpAewScalar http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala index 274b1ca..e515b34 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAt.scala @@ -27,7 +27,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.functions.GroupReduceFunction import org.apache.flink.shaded.com.google.common.collect.Lists import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm import org.apache.mahout.math.Matrix @@ -37,6 +36,8 @@ import org.apache.mahout.math.drm.DrmTuple import org.apache.mahout.math.drm.logical.OpAt import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.flink.api.scala._ + /** * Implementation is taken from Spark's At * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala @@ -53,7 +54,7 @@ object FlinkOpAt { val sparseParts = A.asBlockified.ds.flatMap(new FlatMapFunction[(Array[Int], Matrix), DrmTuple[Int]] { def flatMap(typle: (Array[Int], Matrix), out: Collector[DrmTuple[Int]]): Unit = typle match { - case (keys, block) => { + case (keys, block) => (0 until block.ncol).map(columnIdx => { val columnVector: Vector = new SequentialAccessSparseVector(ncol) @@ -61,9 +62,8 @@ object FlinkOpAt { columnVector(key) = block(idx, columnIdx) } - out.collect(new Tuple2(columnIdx, columnVector)) + out.collect((columnIdx, columnVector)) }) - } } }) @@ -73,7 +73,7 @@ object FlinkOpAt { def reduce(values: Iterable[(Int, Vector)], out: Collector[DrmTuple[Int]]): Unit = { val it = Lists.newArrayList(values).asScala val (idx, _) = it.head - val vector = it map { case (idx, vec) => vec } reduce (_ + _) + val vector = (it map { case (idx, vec) => vec }).sum out.collect((idx, vector)) } }) http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala index 0e30eff..629857a 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtA.scala @@ -5,7 +5,7 @@ import java.lang.Iterable import scala.collection.JavaConverters._ import org.apache.flink.api.common.functions._ -import org.apache.flink.api.java.DataSet +import org.apache.flink.api.scala.DataSet import org.apache.flink.configuration.Configuration import org.apache.flink.shaded.com.google.common.collect.Lists import org.apache.flink.util.Collector @@ -56,7 +56,7 @@ object FlinkOpAtA { def reduce(m1: Matrix, m2: Matrix) = m1 + m2 }).collect() - res.asScala.head + res.head } def fat(op: OpAtA[Any], A: FlinkDrm[Any]): FlinkDrm[Int] = { @@ -155,7 +155,7 @@ object FlinkOpAtA { val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack)) // And then we connect the ranges using gaps between offsets: - val ranges = offsets.sliding(2).map { offs => (offs(0) until offs(1)) } + val ranges = offsets.sliding(2).map { offs => offs(0) until offs(1) } ranges.toArray } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala index 0dd0dd2..b514868 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAtB.scala @@ -25,9 +25,7 @@ import scala.reflect.ClassTag import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.common.functions.GroupReduceFunction -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.scala.DataSet import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm @@ -40,7 +38,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ import com.google.common.collect.Lists - +import org.apache.flink.api.scala._ /** * Implementation is taken from Spark's AtB @@ -65,8 +63,8 @@ object FlinkOpAtB { joined.flatMap(new FlatMapFunction[Tuple2[(_, Vector), (_, Vector)], (Int, Matrix)] { def flatMap(in: Tuple2[(_, Vector), (_, Vector)], out: Collector[(Int, Matrix)]): Unit = { - val avec = in.f0._2 - val bvec = in.f1._2 + val avec = in._1._2 + val bvec = in._1._2 0.until(blockCount) map { blockKey => val blockStart = blockKey * blockHeight http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala index 503ab17..4302457 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpAx.scala @@ -20,17 +20,17 @@ package org.apache.mahout.flinkbindings.blas import java.util.List -import scala.reflect.ClassTag - import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration -import org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.Vector +import org.apache.mahout.flinkbindings.drm.{BlockifiedFlinkDrm, FlinkDrm} +import org.apache.mahout.math.{Matrix, Vector} import org.apache.mahout.math.drm.logical.OpAx import org.apache.mahout.math.scalabindings.RLikeOps._ +import scala.reflect.ClassTag + +import org.apache.flink.api.scala._ + /** * Implementation is taken from Spark's Ax @@ -40,7 +40,6 @@ object FlinkOpAx { def blockifiedBroadcastAx[K: ClassTag](op: OpAx[K], A: FlinkDrm[K]): FlinkDrm[K] = { implicit val ctx = A.context - // val x = drmBroadcast(op.x) val singletonDataSetX = ctx.env.fromElements(op.x) @@ -48,7 +47,7 @@ object FlinkOpAx { var x: Vector = null override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext() + val runtime = this.getRuntimeContext val dsX: List[Vector] = runtime.getBroadcastVariable("vector") x = dsX.get(0) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala index 49ca7d5..6cf5e5c 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpCBind.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.flink.api.common.functions.CoGroupFunction import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.java.DataSet +import org.apache.flink.api.scala.DataSet import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings._ import org.apache.mahout.flinkbindings.drm._ @@ -35,6 +35,7 @@ import org.apache.mahout.math.scalabindings.RLikeOps._ import com.google.common.collect.Lists import org.apache.mahout.flinkbindings.DrmDataSet +import org.apache.mahout.math.scalabindings._ /** * Implementation is taken from Spark's cbind @@ -61,11 +62,11 @@ object FlinkOpCBind { val it1 = Lists.newArrayList(it1java).asScala val it2 = Lists.newArrayList(it2java).asScala - if (!it1.isEmpty && !it2.isEmpty) { + if (it1.nonEmpty && it2.nonEmpty) { val (idx, a) = it1.head val (_, b) = it2.head - val result: Vector = if (a.isDense && b.isDense) { + val result: Vector = if (a.isDense && b.isDense) { new DenseVector(n) } else { new SequentialAccessSparseVector(n) @@ -75,7 +76,7 @@ object FlinkOpCBind { result(n1 until n) := b out.collect((idx, result)) - } else if (it1.isEmpty && !it2.isEmpty) { + } else if (it1.isEmpty && it2.nonEmpty) { val (idx, b) = it2.head val result: Vector = if (b.isDense) { new DenseVector(n) @@ -84,7 +85,7 @@ object FlinkOpCBind { } result(n1 until n) := b out.collect((idx, result)) - } else if (!it1.isEmpty && it2.isEmpty) { + } else if (it1.nonEmpty && it2.isEmpty) { val (idx, a) = it1.head val result: Vector = if (a.isDense) { new DenseVector(n) http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala index 9ebff51..83beaa1 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpRBind.scala @@ -20,7 +20,7 @@ package org.apache.mahout.flinkbindings.blas import scala.reflect.ClassTag -import org.apache.flink.api.java.DataSet +import org.apache.flink.api.scala.DataSet import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm import org.apache.mahout.math.Vector http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala index af3854d..989fad1 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala @@ -28,6 +28,8 @@ import org.apache.mahout.math.Matrix import org.apache.mahout.math.drm.logical.OpTimesRightMatrix import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.flink.api.scala._ + /** * Implementation is taken from Spark's OpTimesRightMatrix: * https://github.com/apache/mahout/blob/master/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala @@ -43,7 +45,7 @@ object FlinkOpTimesRightMatrix { var inCoreB: Matrix = null override def open(params: Configuration): Unit = { - val runtime = this.getRuntimeContext() + val runtime = this.getRuntimeContext val dsB: java.util.List[Matrix] = runtime.getBroadcastVariable("matrix") inCoreB = dsB.get(0) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala index b6e6211..96d57d2 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/CheckpointedFlinkDrm.scala @@ -18,34 +18,21 @@ */ package org.apache.mahout.flinkbindings.drm -import scala.collection.JavaConverters._ -import scala.util.Random -import scala.reflect.{ClassTag, classTag} -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.functions.ReduceFunction -import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat +import org.apache.flink.api.common.functions.{MapFunction, ReduceFunction} import org.apache.flink.api.java.tuple.Tuple2 -import org.apache.hadoop.io.IntWritable -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.DrmDataSet -import org.apache.mahout.math.DenseMatrix -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.SparseMatrix -import org.apache.mahout.math.Vector -import org.apache.mahout.math.VectorWritable -import org.apache.mahout.math.drm._ -import org.apache.mahout.math.drm.CacheHint -import org.apache.mahout.math.drm.CheckpointedDrm -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.math.drm.DrmTuple -import org.apache.mahout.math.scalabindings._ +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat +import org.apache.hadoop.io.{IntWritable, LongWritable, Text, Writable} +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat} +import org.apache.mahout.flinkbindings.{DrmDataSet, _} +import org.apache.mahout.math.{DenseMatrix, Matrix, SparseMatrix, Vector, VectorWritable} +import org.apache.mahout.math.drm.{CacheHint, CheckpointedDrm, DistributedContext, DrmTuple, _} import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ + +import scala.collection.JavaConverters._ +import scala.reflect.{ClassTag, classTag} +import scala.util.Random class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], private var _nrow: Long = CheckpointedFlinkDrm.UNKNOWN, @@ -71,7 +58,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], } }) - val list = res.collect().asScala.toList + val list = res.collect() list.head } @@ -95,7 +82,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = this def collect: Matrix = { - val data = ds.collect().asScala.toList + val data = ds.collect() val isDense = data.forall(_._2.isDense) val cols = ncol @@ -139,7 +126,7 @@ class CheckpointedFlinkDrm[K: ClassTag](val ds: DrmDataSet[K], val keyTag = implicitly[ClassTag[K]] val convertKey = keyToWritableFunc(keyTag) - val writableDataset = ds.map(new MapFunction[(K, Vector), Tuple2[Writable, VectorWritable]] { + val writableDataset = ds.map(new MapFunction[(K, Vector), (Writable, VectorWritable)] { def map(tuple: (K, Vector)): Tuple2[Writable, VectorWritable] = tuple match { case (idx, vec) => new Tuple2(convertKey(idx), new VectorWritable(vec)) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala index dbc6b11..c9c1b2c 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/drm/FlinkDrm.scala @@ -20,23 +20,17 @@ package org.apache.mahout.flinkbindings.drm import java.lang.Iterable -import scala.collection.JavaConverters.iterableAsScalaIterableConverter -import scala.reflect.ClassTag - -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.functions.MapPartitionFunction -import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.flink.api.common.functions.{FlatMapFunction, MapPartitionFunction} +import org.apache.flink.api.scala._ import org.apache.flink.util.Collector -import org.apache.mahout.flinkbindings.BlockifiedDrmDataSet -import org.apache.mahout.flinkbindings.DrmDataSet -import org.apache.mahout.flinkbindings.FlinkDistributedContext -import org.apache.mahout.flinkbindings.wrapContext -import org.apache.mahout.math.DenseMatrix -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.SparseRowMatrix +import org.apache.mahout.flinkbindings.{BlockifiedDrmDataSet, DrmDataSet, FlinkDistributedContext, wrapContext} import org.apache.mahout.math.drm.DrmTuple -import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.{DenseMatrix, Matrix, SparseRowMatrix} + +import scala.collection.JavaConverters.iterableAsScalaIterableConverter +import scala.reflect.ClassTag trait FlinkDrm[K] { def executionEnvironment: ExecutionEnvironment @@ -100,7 +94,7 @@ class BlockifiedFlinkDrm[K: ClassTag](val ds: BlockifiedDrmDataSet[K], val ncol: def asRowWise = { val out = ds.flatMap(new FlatMapFunction[(Array[K], Matrix), DrmTuple[K]] { - def flatMap(typle: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = typle match { + def flatMap(tuple: (Array[K], Matrix), out: Collector[DrmTuple[K]]): Unit = tuple match { case (keys, block) => keys.view.zipWithIndex.foreach { case (key, idx) => out.collect((key, block(idx, ::))) http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala index 656b8de..6b8f2ae 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/package.scala @@ -19,18 +19,21 @@ package org.apache.mahout import org.apache.flink.api.common.functions.{FilterFunction, MapFunction} -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.mahout.flinkbindings.drm.{CheckpointedFlinkDrmOps, CheckpointedFlinkDrm, FlinkDrm, RowsFlinkDrm} import org.apache.mahout.math.{DenseVector, Matrix, MatrixWritable, Vector, VectorWritable} import org.apache.mahout.math.drm.{BlockifiedDrmTuple, CheckpointedDrm, DistributedContext, DrmTuple, _} import org.slf4j.LoggerFactory +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ + import scala.Array._ import scala.reflect.ClassTag package object flinkbindings { - private[flinkbindings] val log = LoggerFactory.getLogger("apache.org.mahout.flinkbindings") + private[flinkbindings] val log = LoggerFactory.getLogger("org.apache.mahout.flinkbindings") /** Row-wise organized DRM dataset type */ type DrmDataSet[K] = DataSet[DrmTuple[K]] @@ -78,7 +81,7 @@ package object flinkbindings { def readCsv(file: String, delim: String = ",", comment: String = "#") - (implicit dc: DistributedContext): CheckpointedDrm[Int] = { + (implicit dc: DistributedContext): CheckpointedDrm[Long] = { val vectors = dc.env.readTextFile(file) .filter(new FilterFunction[String] { def filter(in: String): Boolean = { @@ -94,8 +97,8 @@ package object flinkbindings { datasetToDrm(vectors) } - def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Int] = { - val zipped = new DataSetOps(ds).zipWithIndex + def datasetToDrm(ds: DataSet[Vector]): CheckpointedDrm[Long] = { + val zipped = ds.zipWithIndex datasetWrap(zipped) } http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala index dd76ff4..6fb71ea 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DistributedFlinkSuite.scala @@ -18,8 +18,7 @@ */ package org.apache.mahout.flinkbindings -import org.apache.flink.api.java.ExecutionEnvironment -import org.apache.mahout.flinkbindings._ +import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.mahout.math.drm.DistributedContext import org.apache.mahout.test.DistributedMahoutSuite import org.scalatest.Suite http://git-wip-us.apache.org/repos/asf/mahout/blob/e2ab67f2/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala index a9e8436..4e713c7 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/examples/ReadCsvExample.scala @@ -18,7 +18,7 @@ */ package org.apache.mahout.flinkbindings.examples -import org.apache.flink.api.java.ExecutionEnvironment +import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.mahout.math.drm._ import org.apache.mahout.math.drm.RLikeDrmOps._ import org.apache.mahout.flinkbindings._
