Repository: spark
Updated Branches:
  refs/heads/branch-1.2 2ea782a9d -> 73cb806f7


[SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner 
of EdgeRDDImp...

If the value of 'spark.default.parallelism' does not match the number of 
partitoins in EdgePartition(EdgeRDDImpl),
the following error occurs in ReplicatedVertexView.scala:72;

object GraphTest extends Logging {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = {
    graph.aggregateMessages(
      ctx => {
        ctx.sendToSrc(1)
        ctx.sendToDst(2)
      },
      _ + _)
  }
}

val g = GraphLoader.edgeListFile(sc, "graph.txt")
val rdd = GraphTest.run(g)

java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of 
partitions
        at 
org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:204)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
        at 
org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193)
        at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)
    ...

Author: Takeshi Yamamuro <linguin....@gmail.com>

Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits:

0cd8942 [Ankur Dave] Use more concise getOrElse
aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions
0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a 
partitioner of EdgeRDDImpl

(cherry picked from commit e224dbb011789297cd6c6ba095f702c042869ed6)
Signed-off-by: Ankur Dave <ankurd...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73cb806f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73cb806f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73cb806f

Branch: refs/heads/branch-1.2
Commit: 73cb806f71fbc44ce2488254db177f6500fe83c7
Parents: 2ea782a
Author: Takeshi Yamamuro <linguin....@gmail.com>
Authored: Fri Jan 23 19:25:15 2015 -0800
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Fri Jan 23 19:29:42 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/graphx/impl/EdgeRDDImpl.scala  |  4 ++--
 .../org/apache/spark/graphx/GraphSuite.scala    | 20 ++++++++++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73cb806f/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 897c7ee..f1550ac 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl
 
 import scala.reflect.{classTag, ClassTag}
 
-import org.apache.spark.{OneToOneDependency, Partition, Partitioner, 
TaskContext}
+import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -46,7 +46,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
    * partitioner that allows co-partitioning with `partitionsRDD`.
    */
   override val partitioner =
-    
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
+    partitionsRDD.partitioner.orElse(Some(new 
HashPartitioner(partitions.size)))
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/73cb806f/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index 9da0064..ed9876b 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -386,4 +386,24 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("non-default number of edge partitions") {
+    val n = 10
+    val defaultParallelism = 3
+    val numEdgePartitions = 4
+    assert(defaultParallelism != numEdgePartitions)
+    val conf = new org.apache.spark.SparkConf()
+      .set("spark.default.parallelism", defaultParallelism.toString)
+    val sc = new SparkContext("local", "test", conf)
+    try {
+      val edges = sc.parallelize((1 to n).map(x => (x: VertexId, 0: VertexId)),
+        numEdgePartitions)
+      val graph = Graph.fromEdgeTuples(edges, 1)
+      val neighborAttrSums = graph.mapReduceTriplets[Int](
+        et => Iterator((et.dstId, et.srcAttr)), _ + _)
+      assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n)))
+    } finally {
+      sc.stop()
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to