[
https://issues.apache.org/jira/browse/SPARK-1931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14010340#comment-14010340
]
Ankur Dave commented on SPARK-1931:
-----------------------------------
Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the
edges before constructing the graph as follows:
{code}
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._
def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy:
PartitionStrategy): RDD[Edge[ED]] = {
val numPartitions = edges.partitions.size
edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId,
numPartitions), e))
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitions(_.map(_._2), preservesPartitioning = true)
}
val g = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ==
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
val gPart = Graph(
sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
partitionBy(sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2),
PartitionStrategy.EdgePartition2D))
assert(gPart.triplets.collect.map(_.toTuple).toSet ==
Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
{code}
> Graph.partitionBy does not reconstruct routing tables
> -----------------------------------------------------
>
> Key: SPARK-1931
> URL: https://issues.apache.org/jira/browse/SPARK-1931
> Project: Spark
> Issue Type: Bug
> Components: GraphX
> Affects Versions: 1.0.0
> Reporter: Ankur Dave
> Assignee: Ankur Dave
> Fix For: 1.0.1
>
>
> Commit 905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in
> partitionBy where, after repartitioning the edges, it reuses the VertexRDD
> without updating the routing tables to reflect the new edge layout. This
> causes the following test to fail:
> {code}
> import org.apache.spark.graphx._
> val g = Graph(
> sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
> sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
> assert(g.triplets.collect.map(_.toTuple).toSet ==
> Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
> val gPart = g.partitionBy(PartitionStrategy.EdgePartition2D)
> assert(gPart.triplets.collect.map(_.toTuple).toSet ==
> Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)