Repository: mahout Updated Branches: refs/heads/flink-binding e75c5f606 -> 93ebed620
MAHOUT-1812: Implement drmParallelizeEmptyLong(...) in flink Bindings Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/93ebed62 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/93ebed62 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/93ebed62 Branch: refs/heads/flink-binding Commit: 93ebed620ae7d7da1a6ac2dbe944a8fd517de1c0 Parents: e75c5f6 Author: smarthi <[email protected]> Authored: Tue Mar 15 15:03:35 2016 -0400 Committer: smarthi <[email protected]> Committed: Tue Mar 15 15:04:21 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 95 +++++++++++--------- .../standard/NaiveBayesTestSuite.scala | 18 ++++ 2 files changed, 71 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/93ebed62/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 3988d51..8db063b 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -43,11 +43,11 @@ object FlinkEngine extends DistributedEngine { var hdfsUtils: HDFSUtil = Hadoop2HDFSUtil /** - * Load DRM from hdfs (as in Mahout DRM format). - * - * @param path The DFS path to load from - * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). - */ + * Load DRM from hdfs (as in Mahout DRM format). + * + * @param path The DFS path to load from + * @param parMin Minimum parallelism after load (equivalent to #par(min=...)). + */ override def drmDfsRead(path: String, parMin: Int = 1) (implicit dc: DistributedContext): CheckpointedDrm[_] = { @@ -103,7 +103,7 @@ object FlinkEngine extends DistributedEngine { override def indexedDatasetDFSRead(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) (implicit sc: DistributedContext): IndexedDataset = ??? - override def indexedDatasetDFSReadElements(src: String,schema: Schema, existingRowIDs: Option[BiDictionary]) + override def indexedDatasetDFSReadElements(src: String, schema: Schema, existingRowIDs: Option[BiDictionary]) (implicit sc: DistributedContext): IndexedDataset = ??? @@ -115,16 +115,16 @@ object FlinkEngine extends DistributedEngine { */ override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action) - /** - * Translates logical plan into Flink execution plan. - **/ + /** + * Translates logical plan into Flink execution plan. + **/ override def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = { // Flink-specific Physical Plan translation. implicit val typeInformation = generateTypeInformation[K] val drm = flinkTranslate(plan) val newcp = new CheckpointedFlinkDrm(ds = drm.asRowWise.ds, _nrow = plan.nrow, _ncol = plan.ncol) - // newcp.ds.getExecutionEnvironment.createProgramPlan("plan") + // newcp.ds.getExecutionEnvironment.createProgramPlan("plan") newcp.cache() } @@ -152,10 +152,10 @@ object FlinkEngine extends DistributedEngine { // express ABt via AtB: let C=At and D=Bt, and calculate CtD // TODO: create specific implementation of ABt, see MAHOUT-1750 val opAt = OpAt(a.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) + val at = FlinkOpAt.sparseTrick(opAt, flinkTranslate(a.asInstanceOf[DrmLike[Int]])) val c = new CheckpointedFlinkDrm(at.asRowWise.ds, _nrow = opAt.nrow, _ncol = opAt.ncol) val opBt = OpAt(b.asInstanceOf[DrmLike[Int]]) // TODO: casts! - val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) + val bt = FlinkOpAt.sparseTrick(opBt, flinkTranslate(b.asInstanceOf[DrmLike[Int]])) val d = new CheckpointedFlinkDrm(bt.asRowWise.ds, _nrow = opBt.nrow, _ncol = opBt.ncol) FlinkOpAtB.notZippable(OpAtB(c, d), flinkTranslate(c), flinkTranslate(d)).asInstanceOf[FlinkDrm[K]] case op@OpAtA(a) if op.keyClassTag == ClassTag.Int â FlinkOpAtA.at_a(op, flinkTranslate(a)).asInstanceOf[FlinkDrm[K]] @@ -190,11 +190,11 @@ object FlinkEngine extends DistributedEngine { } } - /** - * returns a vector that contains a column-wise sum from DRM - */ + /** + * returns a vector that contains a column-wise sum from DRM + */ override def colSums[K](drm: CheckpointedDrm[K]): Vector = { - implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val kTag: ClassTag[K] = drm.keyClassTag implicit val typeInformation = generateTypeInformation[K] @@ -208,7 +208,7 @@ object FlinkEngine extends DistributedEngine { /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */ override def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector = { - implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val kTag: ClassTag[K] = drm.keyClassTag implicit val typeInformation = generateTypeInformation[K] @@ -228,18 +228,18 @@ object FlinkEngine extends DistributedEngine { list.head } - /** - * returns a vector that contains a column-wise mean from DRM - */ + /** + * returns a vector that contains a column-wise mean from DRM + */ override def colMeans[K](drm: CheckpointedDrm[K]): Vector = { drm.colSums() / drm.nrow } /** - * Calculates the element-wise squared norm of a matrix - */ + * Calculates the element-wise squared norm of a matrix + */ override def norm[K](drm: CheckpointedDrm[K]): Double = { - implicit val kTag: ClassTag[K] = drm.keyClassTag + implicit val kTag: ClassTag[K] = drm.keyClassTag implicit val typeInformation = generateTypeInformation[K] val sumOfSquares = drm.ds.map { @@ -248,13 +248,13 @@ object FlinkEngine extends DistributedEngine { } }.reduce(_ + _) -// val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] { -// def map(tuple: (K, Vector)): Double = tuple match { -// case (idx, vec) => vec dot vec -// } -// }).reduce(new ReduceFunction[Double] { -// def reduce(v1: Double, v2: Double) = v1 + v2 -// }) + // val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] { + // def map(tuple: (K, Vector)): Double = tuple match { + // case (idx, vec) => vec dot vec + // } + // }).reduce(new ReduceFunction[Double] { + // def reduce(v1: Double, v2: Double) = v1 + v2 + // }) val list = sumOfSquares.collect @@ -263,12 +263,12 @@ object FlinkEngine extends DistributedEngine { } /** Broadcast support */ - override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = + override def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = FlinkByteBCast.wrap(v) /** Broadcast support */ - override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = + override def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = FlinkByteBCast.wrap(m) @@ -278,19 +278,19 @@ object FlinkEngine extends DistributedEngine { val parallelDrm = parallelize(m, numPartitions) - new CheckpointedFlinkDrm(ds=parallelDrm, _nrow=m.numRows(), _ncol=m.numCols()) + new CheckpointedFlinkDrm(ds = parallelDrm, _nrow = m.numRows(), _ncol = m.numCols()) } private[flinkbindings] def parallelize(m: Matrix, parallelismDegree: Int) - (implicit dc: DistributedContext): DrmDataSet[Int] = { - val rows = (0 until m.nrow).map(i => (i, m(i, ::)))//.toSeq.sortWith((ii, jj) => ii._1 < jj._1) + (implicit dc: DistributedContext): DrmDataSet[Int] = { + val rows = (0 until m.nrow).map(i => (i, m(i, ::))) //.toSeq.sortWith((ii, jj) => ii._1 < jj._1) val dataSetType = TypeExtractor.getForObject(rows.head) //TODO: Make Sure that this is the correct partitioning scheme dc.env.fromCollection(rows) - .partitionByRange(0) - .setParallelism(parallelismDegree) - .rebalance() + .partitionByRange(0) + .setParallelism(parallelismDegree) + .rebalance() } /** Parallelize in-core matrix as flink distributed matrix, using row labels as a data set keys. */ @@ -307,7 +307,7 @@ object FlinkEngine extends DistributedEngine { /** This creates an empty DRM with specified number of partitions and cardinality. */ override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) (implicit dc: DistributedContext): CheckpointedDrm[Int] = { - val nonParallelResult = (0 to numPartitions).flatMap { part => + val nonParallelResult = (0 to numPartitions).flatMap { part â val partNRow = (nrow - 1) / numPartitions + 1 val partStart = partNRow * part val partEnd = Math.min(partStart + partNRow, nrow) @@ -315,13 +315,24 @@ object FlinkEngine extends DistributedEngine { for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) } val result = dc.env.fromCollection(nonParallelResult) - new CheckpointedFlinkDrm[Int](ds=result, _nrow=nrow, _ncol=ncol) + new CheckpointedFlinkDrm[Int](ds = result, _nrow = nrow, _ncol = ncol) } /** Creates empty DRM with non-trivial height */ override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Long] = ??? - + (implicit dc: DistributedContext): CheckpointedDrm[Long] = { + + val nonParallelResult = (0 to numPartitions).flatMap { part â + val partNRow = (nrow - 1) / numPartitions + 1 + val partStart = partNRow * part + val partEnd = Math.min(partStart + partNRow, nrow) + + for (i â partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) + } + + val result = dc.env.fromCollection(nonParallelResult) + new CheckpointedFlinkDrm[Long](ds = result, nrow, ncol, cacheHint = CacheHint.NONE) + } /** * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys http://git-wip-us.apache.org/repos/asf/mahout/blob/93ebed62/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala index d6feed9..0f1d6bc 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/standard/NaiveBayesTestSuite.scala @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.mahout.flinkbindings.standard import org.apache.mahout.classifier.naivebayes.NBTestBase
