Repository: spark Updated Branches: refs/heads/master e8d93ee52 -> 5284ca78d
Enable repartitioning of graph over different number of partitions It is currently very difficult to repartition a graph over a different number of partitions. This PR adds an additional `partitionBy` function that takes the number of partitions. Author: Joseph E. Gonzalez <joseph.e.gonza...@gmail.com> Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits: 730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5284ca78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5284ca78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5284ca78 Branch: refs/heads/master Commit: 5284ca78d17fb4de9a7019f3bbecf86484c13763 Parents: e8d93ee Author: Joseph E. Gonzalez <joseph.e.gonza...@gmail.com> Authored: Tue Jun 3 20:49:14 2014 -0700 Committer: Ankur Dave <ankurd...@gmail.com> Committed: Tue Jun 3 20:49:14 2014 -0700 ---------------------------------------------------------------------- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 10 ++++++++++ .../scala/org/apache/spark/graphx/PartitionStrategy.scala | 8 +++++--- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala | 6 +++++- 3 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index c4f9d65..14ae50e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -106,10 +106,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab /** * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] /** + * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. + * @param numPartitions the number of edge partitions in the new graph. + */ + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] + + /** * Transforms each vertex attribute in the graph using the map function. * * @note The new graph has the same structure. As a consequence the underlying index structures http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index ef412cf..5e7e72a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -114,9 +114,11 @@ object PartitionStrategy { */ case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + if (src < dst) { + math.abs((src, dst).hashCode()) % numParts + } else { + math.abs((dst, src).hashCode()) % numParts + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 59d9a88..15ea05c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - val numPartitions = edges.partitions.size + partitionBy(partitionStrategy, edges.partitions.size) + } + + override def partitionBy( + partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] val newEdges = edges.withPartitionsRDD(edges.map { e =>