Repository: mahout Updated Branches: refs/heads/master add081015 -> 0c39c2999
MAHOUT-1835: Remove countsPerPartition in Flink/blas/package.scala Project: http://git-wip-us.apache.org/repos/asf/mahout/repo Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/0c39c299 Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/0c39c299 Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/0c39c299 Branch: refs/heads/master Commit: 0c39c29998c8b6f93174bf7458f1b4732405c3e4 Parents: add0810 Author: smarthi <[email protected]> Authored: Sat Apr 23 10:17:40 2016 -0400 Committer: smarthi <[email protected]> Committed: Sat Apr 23 10:17:40 2016 -0400 ---------------------------------------------------------------------- .../mahout/flinkbindings/blas/package.scala | 31 +++----------------- pom.xml | 2 +- 2 files changed, 5 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mahout/blob/0c39c299/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala index 553a422..32a8cac 100644 --- a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala +++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala @@ -23,42 +23,19 @@ import java.lang.Iterable import org.apache.flink.api.common.functions.RichMapPartitionFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.utils._ import org.apache.flink.configuration.Configuration import org.apache.flink.util.Collector import org.apache.mahout.flinkbindings.drm.FlinkDrm import org.apache.mahout.math.drm.DrmLike +import org.apache.mahout.math.scalabindings.RLikeOps._ import org.apache.mahout.math.{RandomAccessSparseVector, Vector} -import org.apache.mahout.math._ -import scalabindings._ -import RLikeOps._ -import scala.collection._ import scala.reflect.ClassTag package object blas { /** - * To compute tuples (PartitionIndex, PartitionElementCount) - * - * @param drmDataSet - DRM Dataset - * @tparam K - Key type - * @return (PartitionIndex, PartitionElementCount) - */ - //TODO: Remove this when FLINK-3657 is merged into Flink codebase and - // replace by call to DataSetUtils.countElementsPerPartition(DataSet[K]) - private[flinkbindings] def countsPerPartition[K](drmDataSet: DataSet[K]): DataSet[(Int, Int)] = { - drmDataSet.mapPartition { - new RichMapPartitionFunction[K, (Int, Int)] { - override def mapPartition(iterable: Iterable[K], collector: Collector[(Int, Int)]) = { - val count: Int = Iterator(iterable).size - val index: Int = getRuntimeContext.getIndexOfThisSubtask - collector.collect((index, count)) - } - } - } - } - - /** * Rekey matrix dataset keys to consecutive int keys. * @param drmDataSet incoming matrix row-wise dataset * @param computeMap if true, also compute mapping between old and new keys @@ -79,13 +56,13 @@ package object blas { val env = datasetA.getExecutionEnvironment // First, compute partition sizes. - val partSizes = countsPerPartition(datasetA).collect().toList + val partSizes = DataSetUtils(datasetA).countElementsPerPartition.collect().toList // Starting indices var startInd = new Array[Int](datasetA.getParallelism) // Save counts - for (pc <- partSizes) startInd(pc._1) = pc._2 + for (pc <- partSizes) startInd(pc._1) = pc._2.toInt // compute cumulative sum val cumulativeSum = startInd.scanLeft(0)(_ + _).init http://git-wip-us.apache.org/repos/asf/mahout/blob/0c39c299/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5136320..c669467 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ <scala.compat.version>2.10</scala.compat.version> <scala.version>2.10.4</scala.version> <spark.version>1.5.2</spark.version> - <flink.version>1.0.1</flink.version> + <flink.version>1.0.2</flink.version> <h2o.version>0.1.25</h2o.version> <jackson.version>2.7.2</jackson.version> </properties>
