Repository: spark
Updated Branches:
  refs/heads/branch-1.3 81de30ae5 -> d13080aa2


[SPARK-7103] Fix crash with SparkContext.union when RDD has no partitioner

Added a check to the SparkContext.union method to check that a partitioner is 
defined on all RDDs when instantiating a PartitionerAwareUnionRDD.

Author: Steven She <ste...@canopylabs.com>

Closes #5679 from stevencanopy/SPARK-7103 and squashes the following commits:

5a3d846 [Steven She] SPARK-7103: Fix crash with SparkContext.union when at 
least one RDD has no partitioner

(cherry picked from commit b9de9e040aff371c6acf9b3f3d1ff8b360c0cd56)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d13080aa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d13080aa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d13080aa

Branch: refs/heads/branch-1.3
Commit: d13080aa24106a348d3d1e2b8a788292d5915f21
Parents: 81de30a
Author: Steven She <ste...@canopylabs.com>
Authored: Mon Apr 27 18:55:02 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Apr 27 18:55:15 2015 -0400

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  2 +-
 .../spark/rdd/PartitionerAwareUnionRDD.scala    |  1 +
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 21 ++++++++++++++++++++
 3 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 495227b..66426f3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -972,7 +972,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   /** Build the union of a list of RDDs. */
   def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
     val partitioners = rdds.flatMap(_.partitioner).toSet
-    if (partitioners.size == 1) {
+    if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
       new PartitionerAwareUnionRDD(this, rdds)
     } else {
       new UnionRDD(this, rdds)

http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 92b0641..7598ff6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -60,6 +60,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
     var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
   require(rdds.length > 0)
+  require(rdds.forall(_.partitioner.isDefined))
   require(rdds.flatMap(_.partitioner).toSet.size == 1,
     "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d13080aa/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index bede1ff..b5f4a19 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -99,6 +99,27 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 
2, 3, 4))
   }
 
+  test("SparkContext.union creates UnionRDD if at least one RDD has no 
partitioner") {
+    val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new 
HashPartitioner(1))
+    val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
+    val unionRdd = sc.union(rddWithNoPartitioner, rddWithPartitioner)
+    assert(unionRdd.isInstanceOf[UnionRDD[_]])
+  }
+
+  test("SparkContext.union creates PartitionAwareUnionRDD if all RDDs have 
partitioners") {
+    val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new 
HashPartitioner(1))
+    val unionRdd = sc.union(rddWithPartitioner, rddWithPartitioner)
+    assert(unionRdd.isInstanceOf[PartitionerAwareUnionRDD[_]])
+  }
+
+  test("PartitionAwareUnionRDD raises exception if at least one RDD has no 
partitioner") {
+    val rddWithPartitioner = sc.parallelize(Seq(1->true)).partitionBy(new 
HashPartitioner(1))
+    val rddWithNoPartitioner = sc.parallelize(Seq(2->true))
+    intercept[IllegalArgumentException] {
+      new PartitionerAwareUnionRDD(sc, Seq(rddWithNoPartitioner, 
rddWithPartitioner))
+    }
+  }
+
   test("partitioner aware union") {
     def makeRDDWithPartitioner(seq: Seq[Int]) = {
       sc.makeRDD(seq, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to