Repository: spark
Updated Branches:
  refs/heads/master 48866f789 -> 5e6ad24ff


[MLlib] SPARK-5954: Top by key

This PR implements two functions
  - `topByKey(num: Int): RDD[(K, Array[V])]` finds the top-k values for each 
key in a pair RDD. This can be used, e.g., in computing top recommendations.

- `takeOrderedByKey(num: Int): RDD[(K, Array[V])] ` does the opposite of 
`topByKey`

The `sorted` is used here as the `toArray` method of the PriorityQueue does not 
return a necessarily sorted array.

Author: Shuo Xiang <[email protected]>

Closes #5075 from coderxiang/topByKey and squashes the following commits:

1611c37 [Shuo Xiang] code clean up
6f565c0 [Shuo Xiang] naming
a80e0ec [Shuo Xiang] typo and warning
82dded9 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into 
topByKey
d202745 [Shuo Xiang] move to MLPairRDDFunctions
901b0af [Shuo Xiang] style check
70c6e35 [Shuo Xiang] remove takeOrderedByKey, update doc and test
0895c17 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into 
topByKey
b10e325 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into 
topByKey
debccad [Shuo Xiang] topByKey


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e6ad24f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e6ad24f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e6ad24f

Branch: refs/heads/master
Commit: 5e6ad24ff645a9b0f63d9c0f17193550963aa0a7
Parents: 48866f7
Author: Shuo Xiang <[email protected]>
Authored: Fri Mar 20 14:45:44 2015 -0400
Committer: Xiangrui Meng <[email protected]>
Committed: Fri Mar 20 14:45:44 2015 -0400

----------------------------------------------------------------------
 .../spark/mllib/rdd/MLPairRDDFunctions.scala    | 60 ++++++++++++++++++++
 .../mllib/rdd/MLPairRDDFunctionsSuite.scala     | 36 ++++++++++++
 2 files changed, 96 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5e6ad24f/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
new file mode 100644
index 0000000..9213fd3
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Machine learning specific Pair RDD functions.
+ */
+@DeveloperApi
+class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends 
Serializable {
+  /**
+   * Returns the top k (largest) elements for each key from this RDD as 
defined by the specified
+   * implicit Ordering[T].
+   * If the number of elements for a certain key is less than k, all of them 
will be returned.
+   *
+   * @param num k, the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an RDD that contains the top k values for each key
+   */
+  def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = {
+    self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
+      seqOp = (queue, item) => {
+        queue += item
+        queue
+      },
+      combOp = (queue1, queue2) => {
+        queue1 ++= queue2
+        queue1
+      }
+    ).mapValues(_.toArray.sorted(ord.reverse))
+  }
+}
+
+@DeveloperApi
+object MLPairRDDFunctions {
+  /** Implicit conversion from a pair RDD to MLPairRDDFunctions. */
+  implicit def fromPairRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): 
MLPairRDDFunctions[K, V] =
+    new MLPairRDDFunctions[K, V](rdd)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5e6ad24f/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
new file mode 100644
index 0000000..1ac7c12
--- /dev/null
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
+
+class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext {
+  test("topByKey") {
+    val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), 
(5, 1), (5, 3)), 2)
+      .topByKey(2)
+      .collectAsMap()
+
+    assert(topMap.size === 3)
+    assert(topMap(1) === Array(2, 1))
+    assert(topMap(3) === Array(7, 5))
+    assert(topMap(5) === Array(3, 1))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to