Ankur Dave created SPARK-1955:
---------------------------------
Summary: VertexRDD can incorrectly assume index sharing
Key: SPARK-1955
URL: https://issues.apache.org/jira/browse/SPARK-1955
Project: Spark
Issue Type: Bug
Components: GraphX
Affects Versions: 0.9.0, 1.0.0, 0.9.1
Reporter: Ankur Dave
Assignee: Ankur Dave
Priority: Minor
Many VertexRDD operations (diff, leftJoin, innerJoin) can use a fast zip join
if both operands are VertexRDDs sharing the same index (i.e., one operand is
derived from the other). However, this check is implemented by matching on the
operand type and using the fast join strategy if it is a VertexRDD.
When the two VertexRDDs have the same partitioner but different indexes, this
is fine, because each VertexPartition will detect the index mismatch and fall
back to the slow but correct local join strategy.
However, when they have different numbers of partitions or different partition
functions, an exception or even silently incorrect results can occur.
For example:
{code}
// Construct VertexRDDs with different numbers of partitions
val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2)), 1))
val b = VertexRDD(sc.parallelize(List((0L, 5)), 8))
// Try to join them. Appears to work...
val c = a.innerJoin(b) { (vid, x, y) => x + y }
// ... but then fails with java.lang.IllegalArgumentException: Can't zip RDDs
with unequal numbers of partitions
c.collect
{code}
{code}
import org.apache.spark._
// Construct VertexRDDs with different partition functions
val a = VertexRDD(sc.parallelize(List((0L, 1), (1L, 2))).partitionBy(new
HashPartitioner(2)))
val bVerts = sc.parallelize(List((1L, 5)))
val b = VertexRDD(bVerts.partitionBy(new RangePartitioner(2, bVerts)))
// Try to join them. We expect (1L, 7).
val c = a.innerJoin(b) { (vid, x, y) => x + y }
// Silent failure: we get an empty set!
c.collect
{code}
VertexRDD should check equality of partitioners before using the fast zip join.
If the partitioners are different, the two datasets should be automatically
co-partitioned.
--
This message was sent by Atlassian JIRA
(v6.2#6252)