Repository: spark Updated Branches: refs/heads/master 81a152c54 -> c399baa0f
SPARK-1456 Remove view bounds on Ordered in favor of a context bound on Ordering. This doesn't require creating new Ordering objects per row. Additionally, [view bounds are going to be deprecated](https://issues.scala-lang.org/browse/SI-7629), so we should get rid of them while APIs are still flexible. Author: Michael Armbrust <[email protected]> Closes #410 from marmbrus/viewBounds and squashes the following commits: c574221 [Michael Armbrust] fix example. 812008e [Michael Armbrust] Update Java API. 1b9b85c [Michael Armbrust] Update scala doc. 35798a8 [Michael Armbrust] Remove view bounds on Ordered in favor of a context bound on Ordering. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c399baa0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c399baa0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c399baa0 Branch: refs/heads/master Commit: c399baa0fc40be7aa51835aaeadcd5d768dfdb95 Parents: 81a152c Author: Michael Armbrust <[email protected]> Authored: Fri Apr 18 12:04:13 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Fri Apr 18 12:04:13 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/Partitioner.scala | 8 +++--- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../org/apache/spark/api/java/JavaPairRDD.scala | 10 ++------ .../apache/spark/rdd/OrderedRDDFunctions.scala | 26 ++++++++++++++++---- .../org/apache/spark/util/CollectionsUtil.scala | 2 +- 5 files changed, 30 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ad99882..9155159 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -89,12 +89,14 @@ class HashPartitioner(partitions: Int) extends Partitioner { * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. */ -class RangePartitioner[K <% Ordered[K]: ClassTag, V]( +class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, @transient rdd: RDD[_ <: Product2[K,V]], private val ascending: Boolean = true) extends Partitioner { + private val ordering = implicitly[Ordering[K]] + // An array of upper bounds for the first (partitions - 1) partitions private val rangeBounds: Array[K] = { if (partitions == 1) { @@ -103,7 +105,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V]( val rddSize = rdd.count() val maxSampleSize = partitions * 20.0 val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0) - val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _) + val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sorted if (rddSample.length == 0) { Array() } else { @@ -126,7 +128,7 @@ class RangePartitioner[K <% Ordered[K]: ClassTag, V]( var partition = 0 if (rangeBounds.length < 1000) { // If we have less than 100 partitions naive search - while (partition < rangeBounds.length && k > rangeBounds(partition)) { + while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) { partition += 1 } } else { http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ee56373..d3ef75b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1273,7 +1273,7 @@ object SparkContext extends Logging { rdd: RDD[(K, V)]) = new SequenceFileRDDFunctions(rdd) - implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag]( + implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]) = new OrderedRDDFunctions[K, V, (K, V)](rdd) http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index e5b2c8a..b3ec270 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -626,10 +626,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * order of the keys). */ def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { - class KeyOrdering(val a: K) extends Ordered[K] { - override def compare(b: K) = comp.compare(a, b) - } - implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) + implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) } @@ -640,10 +637,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * order of the keys). */ def sortByKey(comp: Comparator[K], ascending: Boolean, numPartitions: Int): JavaPairRDD[K, V] = { - class KeyOrdering(val a: K) extends Ordered[K] { - override def compare(b: K) = comp.compare(a, b) - } - implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x) + implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) } http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index d5691f2..6a3f698 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -24,15 +24,31 @@ import org.apache.spark.{Logging, RangePartitioner} /** * Extra functions available on RDDs of (key, value) pairs where the key is sortable through * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to - * use these functions. They will work with any key type that has a `scala.math.Ordered` - * implementation. + * use these functions. They will work with any key type `K` that has an implicit `Ordering[K]` in + * scope. Ordering objects already exist for all of the standard primitive types. Users can also + * define their own orderings for custom types, or to override the default ordering. The implicit + * ordering that is in the closest scope will be used. + * + * {{{ + * import org.apache.spark.SparkContext._ + * + * val rdd: RDD[(String, Int)] = ... + * implicit val caseInsensitiveOrdering = new Ordering[String] { + * override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase) + * } + * + * // Sort by key, using the above case insensitive ordering. + * rdd.sortByKey() + * }}} */ -class OrderedRDDFunctions[K <% Ordered[K]: ClassTag, +class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: Product2[K, V] : ClassTag]( self: RDD[P]) extends Logging with Serializable { + private val ordering = implicitly[Ordering[K]] + /** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records @@ -45,9 +61,9 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag, shuffled.mapPartitions(iter => { val buf = iter.toArray if (ascending) { - buf.sortWith((x, y) => x._1 < y._1).iterator + buf.sortWith((x, y) => ordering.lt(x._1, y._1)).iterator } else { - buf.sortWith((x, y) => x._1 > y._1).iterator + buf.sortWith((x, y) => ordering.gt(x._1, y._1)).iterator } }, preservesPartitioning = true) } http://git-wip-us.apache.org/repos/asf/spark/blob/c399baa0/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala index 9323503..e4c254b 100644 --- a/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala +++ b/core/src/main/scala/org/apache/spark/util/CollectionsUtil.scala @@ -23,7 +23,7 @@ import scala.Array import scala.reflect._ private[spark] object CollectionsUtils { - def makeBinarySearch[K <% Ordered[K] : ClassTag] : (Array[K], K) => Int = { + def makeBinarySearch[K : Ordering : ClassTag] : (Array[K], K) => Int = { classTag[K] match { case ClassTag.Float => (l, x) => util.Arrays.binarySearch(l.asInstanceOf[Array[Float]], x.asInstanceOf[Float])
