Repository: mahout Updated Branches: refs/heads/flink-binding b52036cc9 -> d9deb68ff
NoJira: minor fixes Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/d9deb68f Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/d9deb68f Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/d9deb68f Branch: refs/heads/flink-binding Commit: d9deb68ff1d744d7994a60181f302f714543dff5 Parents: b52036c Author: smarthi <[email protected]> Authored: Tue Mar 15 17:32:29 2016 -0400 Committer: smarthi <[email protected]> Committed: Tue Mar 15 17:32:29 2016 -0400 ---------------------------------------------------------------------- .../apache/mahout/flinkbindings/FlinkEngine.scala | 8 -------- .../blas/FlinkOpTimesRightMatrix.scala | 11 +---------- .../flinkbindings/FlinkByteBCastSuite.scala | 18 ++++++++++++++++++ .../mahout/sparkbindings/blas/package.scala | 3 +-- 4 files changed, 20 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/d9deb68f/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala index 031b9b0..958b6cf 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/FlinkEngine.scala @@ -248,14 +248,6 @@ object FlinkEngine extends DistributedEngine { } }.reduce(_ + _) - // val sumOfSquares = drm.ds.map(new MapFunction[(K, Vector), Double] { - // def map(tuple: (K, Vector)): Double = tuple match { - // case (idx, vec) => vec dot vec - // } - // }).reduce(new ReduceFunction[Double] { - // def reduce(v1: Double, v2: Double) = v1 + v2 - // }) - val list = sumOfSquares.collect // check on this --why is it returning a list? http://git-wip-us.apache.org/repos/asf/mahout/blob/d9deb68f/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala index 4e5b1a7..70ad9d3 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/FlinkOpTimesRightMatrix.scala @@ -39,22 +39,13 @@ object FlinkOpTimesRightMatrix { implicit val ctx = A.context implicit val kTag = op.keyClassTag - - /* HACK: broadcasting the matrix using Flink's .withBroadcastSet(singletonDataSetB) on a matrix causes a backend Kryo - * Issue resulkting in a stackOverflow error. + * Issue resulting in a stackOverflow error. * * Quick fix is to instead break the matrix down into a list of rows and then rebuild it on the back end * * TODO: this is obviously very inefficient... need to use the correct broadcast on the matrix itself. */ - - // val singletonDataSetB = ctx.env.fromElements(inCoreB) - - - // val inCoreBcastB = FlinkEngine.drmBroadcast(inCoreB) - // val singletonDataSetB = ctx.env.fromElements(inCoreB) - val rows = (0 until inCoreB.nrow).map(i => (i, inCoreB(i, ::))) val dataSetType = TypeExtractor.getForObject(rows.head) val singletonDataSetB = ctx.env.fromCollection(rows) http://git-wip-us.apache.org/repos/asf/mahout/blob/d9deb68f/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala ---------------------------------------------------------------------- diff --git a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala index 4aa524f..4953647 100644 --- a/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala +++ b/flink/src/test/scala/org/apache/mahout/flinkbindings/FlinkByteBCastSuite.scala @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.mahout.flinkbindings import org.apache.mahout.math.scalabindings.RLikeOps._ http://git-wip-us.apache.org/repos/asf/mahout/blob/d9deb68f/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala index 8c4eef2..a78e68e 100644 --- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala +++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala @@ -34,9 +34,8 @@ package object blas { implicit def drmRdd2ops[K](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd) - /** - * Rekey matrix dataset keys to consequtive int keys. + * Rekey matrix dataset keys to consecutive int keys. * @param rdd incoming matrix row-wise dataset * * @param computeMap if true, also compute mapping between old and new keys
