[ 
https://issues.apache.org/jira/browse/SPARK-1931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14010340#comment-14010340
 ] 

Ankur Dave edited comment on SPARK-1931 at 5/27/14 9:47 PM:
------------------------------------------------------------

Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the 
edges before constructing the graph, as follows:

{code}
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: 
PartitionStrategy): RDD[Edge[ED]] = {
  val numPartitions = edges.partitions.size
  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, 
numPartitions), e))
    .partitionBy(new HashPartitioner(numPartitions))
    .mapPartitions(_.map(_._2), preservesPartitioning = true)
}

val g = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))

val gPart = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  partitionBy(sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2), 
PartitionStrategy.EdgePartition2D))
assert(gPart.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
{code}


was (Author: ankurd):
Since the fix didn't make it into Spark 1.0.0, a workaround is to partition the 
edges before constructing the graph as follows:

{code}
import org.apache.spark.HashPartitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx._

def partitionBy[ED](edges: RDD[Edge[ED]], partitionStrategy: 
PartitionStrategy): RDD[Edge[ED]] = {
  val numPartitions = edges.partitions.size
  edges.map(e => (partitionStrategy.getPartition(e.srcId, e.dstId, 
numPartitions), e))
    .partitionBy(new HashPartitioner(numPartitions))
    .mapPartitions(_.map(_._2), preservesPartitioning = true)
}

val g = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
assert(g.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))

val gPart = Graph(
  sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
  partitionBy(sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2), 
PartitionStrategy.EdgePartition2D))
assert(gPart.triplets.collect.map(_.toTuple).toSet ==
  Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
{code}

> Graph.partitionBy does not reconstruct routing tables
> -----------------------------------------------------
>
>                 Key: SPARK-1931
>                 URL: https://issues.apache.org/jira/browse/SPARK-1931
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.0.0
>            Reporter: Ankur Dave
>            Assignee: Ankur Dave
>             Fix For: 1.0.1
>
>
> Commit 905173df57b90f90ebafb22e43f55164445330e6 introduced a bug in 
> partitionBy where, after repartitioning the edges, it reuses the VertexRDD 
> without updating the routing tables to reflect the new edge layout. This 
> causes the following test to fail:
> {code}
>       import org.apache.spark.graphx._
>       val g = Graph(
>         sc.parallelize(List((0L, "a"), (1L, "b"), (2L, "c"))),
>         sc.parallelize(List(Edge(0L, 1L, 1), Edge(0L, 2L, 1)), 2))
>       assert(g.triplets.collect.map(_.toTuple).toSet ==
>         Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
>       val gPart = g.partitionBy(PartitionStrategy.EdgePartition2D)
>       assert(gPart.triplets.collect.map(_.toTuple).toSet ==
>         Set(((0L, "a"), (1L, "b"), 1), ((0L, "a"), (2L, "c"), 1)))
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to