Repository: spark Updated Branches: refs/heads/master 6a7e537f3 -> 6cd28cc21
[SPARK-9236] [CORE] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions See also comments on https://issues.apache.org/jira/browse/SPARK-9236 Author: François Garillot <franc...@garillot.net> Closes #7616 from huitseeker/issue/SPARK-9236 and squashes the following commits: 217f902 [François Garillot] [SPARK-9236] Make defaultPartitioner not reuse a parent RDD's partitioner if it has 0 partitions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6cd28cc2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6cd28cc2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6cd28cc2 Branch: refs/heads/master Commit: 6cd28cc21ed585ab8d1e0e7147a1a48b044c9c8e Parents: 6a7e537 Author: François Garillot <franc...@garillot.net> Authored: Fri Jul 24 15:41:13 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Jul 24 15:41:13 2015 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/Partitioner.scala | 2 +- .../spark/rdd/PairRDDFunctionsSuite.scala | 23 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6cd28cc2/core/src/main/scala/org/apache/spark/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index ad68512..4b9d599 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -56,7 +56,7 @@ object Partitioner { */ def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse - for (r <- bySize if r.partitioner.isDefined) { + for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { return r.partitioner.get } if (rdd.context.conf.contains("spark.default.parallelism")) { http://git-wip-us.apache.org/repos/asf/spark/blob/6cd28cc2/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index dfa102f..1321ec8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -282,6 +282,29 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { )) } + // See SPARK-9326 + test("cogroup with empty RDD") { + import scala.reflect.classTag + val intPairCT = classTag[(Int, Int)] + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT) + + val joined = rdd1.cogroup(rdd2).collect() + assert(joined.size > 0) + } + + // See SPARK-9326 + test("cogroup with groupByed RDD having 0 partitions") { + import scala.reflect.classTag + val intCT = classTag[Int] + + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5) + val joined = rdd1.cogroup(rdd2).collect() + assert(joined.size > 0) + } + test("rightOuterJoin") { val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org