Repository: spark
Updated Branches:
  refs/heads/master 680fd87c6 -> f4e0b28c8


[SPARK-4142][GraphX] Default numEdgePartitions

Changing the default number of edge partitions to match spark parallelism.

Author: Joseph E. Gonzalez <[email protected]>

Closes #3006 from jegonzal/default_partitions and squashes the following 
commits:

a9a5c4f [Joseph E. Gonzalez] Changing the default number of edge partitions to 
match spark parallelism


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4e0b28c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4e0b28c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4e0b28c

Branch: refs/heads/master
Commit: f4e0b28c859412ec8bdfdf452b6a1b2e1bee310e
Parents: 680fd87
Author: Joseph E. Gonzalez <[email protected]>
Authored: Sat Nov 1 01:18:07 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Nov 1 01:18:07 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/examples/graphx/Analytics.scala    |  6 +++---
 .../scala/org/apache/spark/graphx/GraphLoader.scala     | 12 +++++++++---
 2 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f4e0b28c/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala 
b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
index d70d936..828cffb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala
@@ -77,7 +77,7 @@ object Analytics extends Logging {
         val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
 
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart,
+          numEdgePartitions = numEPart,
           edgeStorageLevel = edgeStorageLevel,
           vertexStorageLevel = vertexStorageLevel).cache()
         val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
@@ -110,7 +110,7 @@ object Analytics extends Logging {
 
         val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + 
fname + ")"))
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart,
+          numEdgePartitions = numEPart,
           edgeStorageLevel = edgeStorageLevel,
           vertexStorageLevel = vertexStorageLevel).cache()
         val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
@@ -131,7 +131,7 @@ object Analytics extends Logging {
         val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + 
")"))
         val graph = GraphLoader.edgeListFile(sc, fname,
           canonicalOrientation = true,
-          minEdgePartitions = numEPart,
+          numEdgePartitions = numEPart,
           edgeStorageLevel = edgeStorageLevel,
           vertexStorageLevel = vertexStorageLevel)
           // TriangleCount requires the graph to be partitioned

http://git-wip-us.apache.org/repos/asf/spark/blob/f4e0b28c/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index f4c7936..4933aec 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -48,7 +48,8 @@ object GraphLoader extends Logging {
    * @param path the path to the file (e.g., /home/data/file or hdfs://file)
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
-   * @param minEdgePartitions the number of partitions for the edge RDD
+   * @param numEdgePartitions the number of partitions for the edge RDD
+   * Setting this value to -1 will use the default parallelism.
    * @param edgeStorageLevel the desired storage level for the edge partitions
    * @param vertexStorageLevel the desired storage level for the vertex 
partitions
    */
@@ -56,7 +57,7 @@ object GraphLoader extends Logging {
       sc: SparkContext,
       path: String,
       canonicalOrientation: Boolean = false,
-      minEdgePartitions: Int = 1,
+      numEdgePartitions: Int = -1,
       edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
       vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
     : Graph[Int, Int] =
@@ -64,7 +65,12 @@ object GraphLoader extends Logging {
     val startTime = System.currentTimeMillis
 
     // Parse the edge data table directly into edge partitions
-    val lines = sc.textFile(path, 
minEdgePartitions).coalesce(minEdgePartitions)
+    val lines =
+      if (numEdgePartitions > 0) {
+        sc.textFile(path, numEdgePartitions).coalesce(numEdgePartitions)
+      } else {
+        sc.textFile(path)
+      }
     val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
       val builder = new EdgePartitionBuilder[Int, Int]
       iter.foreach { line =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to