MAHOUT-1820:Add a method to generate Tuple<PartitionId, Partition elements 
count>> to support Flink backend, this closes apache/mahout#207


Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/5863bbf1
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/5863bbf1
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/5863bbf1

Branch: refs/heads/master
Commit: 5863bbf1587cabef68f470fc8cd4812a2c1f8f79
Parents: f0e22e2
Author: smarthi <[email protected]>
Authored: Tue Mar 29 19:26:03 2016 -0400
Committer: smarthi <[email protected]>
Committed: Tue Mar 29 19:26:03 2016 -0400

----------------------------------------------------------------------
 .../mahout/flinkbindings/blas/package.scala     | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mahout/blob/5863bbf1/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala 
b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
new file mode 100644
index 0000000..6a3ac0e
--- /dev/null
+++ b/flink/src/main/scala/org/apache/mahout/flinkbindings/blas/package.scala
@@ -0,0 +1,51 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements. See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership. The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+package org.apache.mahout.flinkbindings
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichMapPartitionFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.util.Collector
+
+import scala.collection._
+
+package object blas {
+
+  /**
+    * To compute tuples (PartitionIndex, PartitionElementCount)
+    *
+    * @param drmDataSet
+    * @tparam K
+    * @return (PartitionIndex, PartitionElementCount)
+    */
+  //TODO: Remove this when FLINK-3657 is merged into Flink codebase and
+  // replace by call to DataSetUtils.countElementsPerPartition(DataSet[K])
+  private[mahout] def countsPerPartition[K](drmDataSet: DataSet[K]): 
DataSet[(Int, Int)] = {
+    drmDataSet.mapPartition {
+      new RichMapPartitionFunction[K, (Int, Int)] {
+        override def mapPartition(iterable: Iterable[K], collector: 
Collector[(Int, Int)]) = {
+          val count: Int = Iterator(iterable).size
+          val index: Int = getRuntimeContext.getIndexOfThisSubtask
+          collector.collect((index, count))
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to