[ 
https://issues.apache.org/jira/browse/SPARK-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ankur Dave updated SPARK-1955:
------------------------------

    Description: 
Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join 
if both operands are VertexRDDs sharing the same index (i.e., one operand is 
derived from the other). This check is implemented by matching on the operand 
type and using the fast join strategy if both are VertexRDDs.

This is clearly fine when both do in fact share the same index. It is also fine 
when the two VertexRDDs have the same partitioner but different indexes, 
because each VertexPartition will detect the index mismatch and fall back to 
the slow but correct local join strategy.

However, when they have different numbers of partitions or different partition 
functions, an exception or even silently incorrect results can occur.

For example:

{code}
// Construct VertexRDDs with different numbers of partitions
val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1))
val b = VertexRDD(sc.parallelize(List((0L, 5)), 8))
// Try to join them. Appears to work...
val c = a.innerJoin(b) { (vid, x, y) => x + y }
// ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs 
with unequal numbers of partitions
c.collect
{code}

{code}
import org.apache.spark._
// Construct VertexRDDs with different partition functions
val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new 
HashPartitioner(2)))
val bVerts = sc.parallelize(List((1L, 5)))
val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts)))
// Try to join them. We expect (1L, 7).
val c = a.innerJoin(b) { (vid, x, y) => x + y }
// Silent failure: we get an empty set!
c.collect
{code}

VertexRDD should check equality of partitioners before using the fast zip join. 
If the partitioners are different, the two datasets should be automatically 
co-partitioned.

  was:
Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join 
if both operands are VertexRDDs sharing the same index (i.e., one operand is 
derived from the other). However, this check is implemented by matching on the 
operand type and using the fast join strategy if it is a VertexRDD.

When the two VertexRDDs have the same partitioner but different indexes, this 
is fine, because each VertexPartition will detect the index mismatch and fall 
back to the slow but correct local join strategy.

However, when they have different numbers of partitions or different partition 
functions, an exception or even silently incorrect results can occur.

For example:

{code}
// Construct VertexRDDs with different numbers of partitions
val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1))
val b = VertexRDD(sc.parallelize(List((0L, 5)), 8))
// Try to join them. Appears to work...
val c = a.innerJoin(b) { (vid, x, y) => x + y }
// ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs 
with unequal numbers of partitions
c.collect
{code}

{code}
import org.apache.spark._
// Construct VertexRDDs with different partition functions
val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new 
HashPartitioner(2)))
val bVerts = sc.parallelize(List((1L, 5)))
val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts)))
// Try to join them. We expect (1L, 7).
val c = a.innerJoin(b) { (vid, x, y) => x + y }
// Silent failure: we get an empty set!
c.collect
{code}

VertexRDD should check equality of partitioners before using the fast zip join. 
If the partitioners are different, the two datasets should be automatically 
co-partitioned.


> VertexRDD can incorrectly assume index sharing
> ----------------------------------------------
>
>                 Key: SPARK-1955
>                 URL: https://issues.apache.org/jira/browse/SPARK-1955
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 0.9.0, 1.0.0, 0.9.1
>            Reporter: Ankur Dave
>            Assignee: Ankur Dave
>            Priority: Minor
>
> Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join 
> if both operands are VertexRDDs sharing the same index (i.e., one operand is 
> derived from the other). This check is implemented by matching on the operand 
> type and using the fast join strategy if both are VertexRDDs.
> This is clearly fine when both do in fact share the same index. It is also 
> fine when the two VertexRDDs have the same partitioner but different indexes, 
> because each VertexPartition will detect the index mismatch and fall back to 
> the slow but correct local join strategy.
> However, when they have different numbers of partitions or different 
> partition functions, an exception or even silently incorrect results can 
> occur.
> For example:
> {code}
> // Construct VertexRDDs with different numbers of partitions
> val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1))
> val b = VertexRDD(sc.parallelize(List((0L, 5)), 8))
> // Try to join them. Appears to work...
> val c = a.innerJoin(b) { (vid, x, y) => x + y }
> // ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs 
> with unequal numbers of partitions
> c.collect
> {code}
> {code}
> import org.apache.spark._
> // Construct VertexRDDs with different partition functions
> val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new 
> HashPartitioner(2)))
> val bVerts = sc.parallelize(List((1L, 5)))
> val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts)))
> // Try to join them. We expect (1L, 7).
> val c = a.innerJoin(b) { (vid, x, y) => x + y }
> // Silent failure: we get an empty set!
> c.collect
> {code}
> VertexRDD should check equality of partitioners before using the fast zip 
> join. If the partitioners are different, the two datasets should be 
> automatically co-partitioned.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to