Repository: spark Updated Branches: refs/heads/master ae253e5a8 -> 46307b2cd
[SPARK-21401][ML][MLLIB] add poll function for BoundedPriorityQueue ## What changes were proposed in this pull request? The most of BoundedPriorityQueue usages in ML/MLLIB are: Get the value of BoundedPriorityQueue, then sort it. For example, in Word2Vec: pq.toSeq.sortBy(-_._2) in ALS, pq.toArray.sorted() The test results show using pq.poll is much faster than sort the value. It is good to add the poll function for BoundedPriorityQueue. ## How was this patch tested? The existing UT Author: Peng <[email protected]> Author: Peng Meng <[email protected]> Closes #18620 from mpjlu/add-poll. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46307b2c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46307b2c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46307b2c Branch: refs/heads/master Commit: 46307b2cd3e504d5cf5c7121b903eaa9c4da4c4b Parents: ae253e5 Author: Peng <[email protected]> Authored: Wed Jul 19 09:56:48 2017 +0100 Committer: Sean Owen <[email protected]> Committed: Wed Jul 19 09:56:48 2017 +0100 ---------------------------------------------------------------------- .../spark/util/BoundedPriorityQueue.scala | 4 ++ .../spark/util/BoundedPriorityQueueSuite.scala | 51 ++++++++++++++++++++ 2 files changed, 55 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/46307b2c/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index 1b2b193..eff0aa4 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -51,6 +51,10 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin this } + def poll(): A = { + underlying.poll() + } + override def +=(elem1: A, elem2: A, elems: A*): this.type = { this += elem1 += elem2 ++= elems } http://git-wip-us.apache.org/repos/asf/spark/blob/46307b2c/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala b/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala new file mode 100644 index 0000000..9465ca7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/BoundedPriorityQueueSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SparkFunSuite + +class BoundedPriorityQueueSuite extends SparkFunSuite { + test("BoundedPriorityQueue poll test") { + val pq = new BoundedPriorityQueue[Double](4) + + pq += 0.1 + pq += 1.5 + pq += 1.0 + pq += 0.3 + pq += 0.01 + + assert(pq.isEmpty == false) + assert(pq.poll() == 0.1) + assert(pq.poll() == 0.3) + assert(pq.poll() == 1.0) + assert(pq.poll() == 1.5) + assert(pq.isEmpty == true) + + val pq2 = new BoundedPriorityQueue[(Int, Double)](4)(Ordering.by(_._2)) + pq2 += 1 -> 0.5 + pq2 += 5 -> 0.1 + pq2 += 3 -> 0.3 + pq2 += 4 -> 0.2 + pq2 += 1 -> 0.4 + + assert(pq2.poll()._2 == 0.2) + assert(pq2.poll()._2 == 0.3) + assert(pq2.poll()._2 == 0.4) + assert(pq2.poll()._2 == 0.5) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
