MAHOUT-1570: Flink: drmParallelizeEmpty and extra tests
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d13f4884 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d13f4884 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d13f4884 Branch: refs/heads/flink-binding Commit: d13f48849ad5658cc03acea57f7c5d9d14bad137 Parents: fcc6cf1 Author: Alexey Grigorev <[email protected]> Authored: Tue Aug 25 15:38:50 2015 +0200 Committer: Alexey Grigorev <[email protected]> Committed: Fri Sep 25 17:45:53 2015 +0200 ---------------------------------------------------------------------- .../mahout/flinkbindings/FlinkEngine.scala | 68 +++++++----------- .../mahout/flinkbindings/DrmLikeOpsSuite.scala | 72 ++++++++++++++++++++ 2 files changed, 95 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/d13f4884/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index a21591d..35c6b76 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -19,8 +19,10 @@ package org.apache.mahout.flinkbindings import java.util.Collection + import scala.collection.JavaConverters._ import scala.reflect.ClassTag + import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.java.tuple.Tuple2 @@ -29,57 +31,20 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.SequenceFileInputFormat -import org.apache.mahout.flinkbindings.blas.FlinkOpAewB -import org.apache.mahout.flinkbindings.blas.FlinkOpAewScalar -import org.apache.mahout.flinkbindings.blas.FlinkOpAt -import org.apache.mahout.flinkbindings.blas.FlinkOpAtB -import org.apache.mahout.flinkbindings.blas.FlinkOpAx -import org.apache.mahout.flinkbindings.blas.FlinkOpCBind -import org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock -import org.apache.mahout.flinkbindings.blas.FlinkOpRBind -import org.apache.mahout.flinkbindings.blas.FlinkOpRowRange -import org.apache.mahout.flinkbindings.blas.FlinkOpTimesRightMatrix import org.apache.mahout.flinkbindings._ -import org.apache.mahout.flinkbindings.drm.CheckpointedFlinkDrm -import org.apache.mahout.flinkbindings.drm.FlinkDrm -import org.apache.mahout.flinkbindings.drm.RowsFlinkDrm +import org.apache.mahout.flinkbindings.blas._ +import org.apache.mahout.flinkbindings.drm._ import org.apache.mahout.flinkbindings.io.HDFSUtil import org.apache.mahout.flinkbindings.io.Hadoop1HDFSUtil -import org.apache.mahout.math.Matrix -import org.apache.mahout.math.Vector -import org.apache.mahout.math.VectorWritable -import org.apache.mahout.math.drm.BCast -import org.apache.mahout.math.drm.BlockMapFunc2 -import org.apache.mahout.math.drm.BlockReduceFunc -import org.apache.mahout.math.drm.CacheHint -import org.apache.mahout.math.drm.CheckpointedDrm -import org.apache.mahout.math.drm.DistributedContext -import org.apache.mahout.math.drm.DistributedEngine -import org.apache.mahout.math.drm.DrmLike -import org.apache.mahout.math.drm.DrmTuple -import org.apache.mahout.math.drm.drm2drmCpOps -import org.apache.mahout.math.drm.logical.OpABt -import org.apache.mahout.math.drm.logical.OpAewB -import org.apache.mahout.math.drm.logical.OpAewScalar -import org.apache.mahout.math.drm.logical.OpAewUnaryFunc -import org.apache.mahout.math.drm.logical.OpAt -import org.apache.mahout.math.drm.logical.OpAtA -import org.apache.mahout.math.drm.logical.OpAtB -import org.apache.mahout.math.drm.logical.OpAtx -import org.apache.mahout.math.drm.logical.OpAx -import org.apache.mahout.math.drm.logical.OpCbind -import org.apache.mahout.math.drm.logical.OpMapBlock -import org.apache.mahout.math.drm.logical.OpRbind -import org.apache.mahout.math.drm.logical.OpRowRange -import org.apache.mahout.math.drm.logical.OpTimesRightMatrix +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.logical._ import org.apache.mahout.math.indexeddataset.BiDictionary import org.apache.mahout.math.indexeddataset.IndexedDataset import org.apache.mahout.math.indexeddataset.Schema import org.apache.mahout.math.scalabindings._ import org.apache.mahout.math.scalabindings.RLikeOps._ -import org.apache.mahout.flinkbindings.blas.FlinkOpAtA -import org.apache.mahout.math.drm.logical.OpCbindScalar -import org.apache.mahout.math.drm.logical.OpAewUnaryFuncFusion + object FlinkEngine extends DistributedEngine { @@ -259,11 +224,24 @@ object FlinkEngine extends DistributedEngine { /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */ override def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1) - (implicit sc: DistributedContext): CheckpointedDrm[String] = ??? + (implicit dc: DistributedContext): CheckpointedDrm[String] = ??? /** This creates an empty DRM with specified number of partitions and cardinality. */ override def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10) - (implicit sc: DistributedContext): CheckpointedDrm[Int] = ??? + (implicit dc: DistributedContext): CheckpointedDrm[Int] = { + val nonParallelResult = (0 to numPartitions).flatMap { part => + val partNRow = (nrow - 1) / numPartitions + 1 + val partStart = partNRow * part + val partEnd = Math.min(partStart + partNRow, nrow) + + for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector) + } + + val dataSetType = TypeExtractor.getForObject(nonParallelResult.head) + val result = dc.env.fromCollection(nonParallelResult.asJava, dataSetType) + + new CheckpointedFlinkDrm(ds=result, _nrow=nrow, _ncol=ncol) + } /** Creates empty DRM with non-trivial height */ override def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10) http://git-wip-us.apache.org/repos/asf/mahout/blob/d13f4884/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala new file mode 100644 index 0000000..4c75afa --- /dev/null +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/DrmLikeOpsSuite.scala @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.mahout.flinkbindings + +import org.apache.mahout.flinkbindings._ +import org.apache.mahout.math._ +import org.apache.mahout.math.drm._ +import org.apache.mahout.math.drm.RLikeDrmOps._ +import org.apache.mahout.math.scalabindings._ +import org.apache.mahout.math.scalabindings.RLikeOps._ +import org.junit.runner.RunWith +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner +import org.slf4j.Logger +import org.slf4j.LoggerFactory + + +@RunWith(classOf[JUnitRunner]) +class DrmLikeOpsSuite extends FunSuite with DistributedFlinkSuite { + + test("norm") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + (inCoreA.norm - A.norm) should be < 1e-6 + } + + test("colSums") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + (inCoreA.colSums - A.colSums).norm(2) should be < 1e-6 + } + + test("rowSums") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + (inCoreA.rowSums - A.rowSums).norm(2) should be < 1e-6 + } + + test("rowMeans") { + val inCoreA = dense((1, 2, 3), (2, 3, 4), (3, 4, 5)) + val A = drmParallelize(m = inCoreA, numPartitions = 2) + + (inCoreA.rowMeans - A.rowMeans).norm(2) should be < 1e-6 + } + + test("drmParallelizeEmpty") { + val emptyDrm = drmParallelizeEmpty(nrow = 2, ncol = 2, numPartitions = 2) + val expected = dense((0, 0), (0, 0)) + + (emptyDrm.collect - expected).norm should be < 1e-6 + } + +} \ No newline at end of file
