[ 
https://issues.apache.org/jira/browse/SPARK-8048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579018#comment-14579018
 ] 

Madhusudanan Kandasamy commented on SPARK-8048:
-----------------------------------------------

Whenever RDD action submits a job, the DAG scheduler would split the job into 
multiple tasks based on number of partition. So using a new partitioner with 
zero number of partitions is equivalent to dropping all the data in the RDD. 
So,I think its fair to expect that result of rdd1.leftOuterJoin(rdd2) should 
match the results when rdd2 is an empty set as detailed in below code snippet.

val baseRDD   = sc.parallelize(1 to 3).map(x=>(x,x))
val emptyrdd1 = sc.parallelize(List[Tuple2[Int,Int]]()).partitionBy(new 
HashPartitioner(0))
val emptyrdd2 = sc.parallelize(List[Tuple2[Int,Int]]())

println("result 1 = " + baseRDD.leftOuterJoin(emptyrdd1).collect().toList) 
println("result 2 = " + baseRDD.leftOuterJoin(emptyrdd2).collect().toList)

result 1 = List() 
result 2 = List((1,(1,None)), (3,(3,None)), (2,(2,None)))

I've the following questions to the commiter,

1. Is this expectation correct ? or should we consider this as working as 
designed?
2. If we consider to fix this, 
        1. Can I work on a PR for this issue?
        2. Let me know your comments on the below fix,

If this expectation is correct, then one way to fix the code would be modifying 
the Partitioner selection code for the co-groupRDDs. As of now, it selects the 
explicitly defined partitioner as the default partitioner, otherwise it selects 
the partitioner which has more number of partition. We can fix the code by 
selecting the explicitly defined partitioner only when number of partitions are 
> 0.

diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e53a78e..7052f8e 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -57,7 +57,7 @@ object Partitioner {
   def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
     val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
     for (r <- bySize if r.partitioner.isDefined) {
-      return r.partitioner.get
+      if (r.partitions.size > 0) return r.partitioner.get
     }
     if (rdd.context.conf.contains("spark.default.parallelism")) {
       new HashPartitioner(rdd.context.defaultParallelism)



> Explicit partitionning of an RDD with 0 partition will yield empty outer join
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-8048
>                 URL: https://issues.apache.org/jira/browse/SPARK-8048
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.1
>            Reporter: Olivier Toupin
>            Priority: Minor
>
> Check this code =>
> https://gist.github.com/anonymous/0f935915f2bc182841f0
> Because of this => {{.partitionBy(new HashPartitioner(0))}}
> The join will return empty result.
> Here a normal expected behaviour would the join to crash, cause error, or to 
> return unjoined results, but instead will yield an empty RDD.
> This a trivial exemple, but imagine: 
> {{.partitionBy(new HashPartitioner(previous.partitions.length))}}. 
> You join on an empty "previous" rdd, the lookup table is empty, Spark will 
> you lose all your results, instead of returning unjoined results, and this 
> without warnings or errors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to