Repository: spark
Updated Branches:
  refs/heads/master 6481d2742 -> e5d376801


[SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR 
#720

PR #720 made multiple changes to GraphGenerator.logNormalGraph including:

* Replacing the call to functions for generating random vertices and edges with 
in-line implementations with different equations. Based on reading the Pregel 
paper, I believe the in-line functions are incorrect.
* Hard-coding of RNG seeds so that method now generates the same graph for a 
given number of vertices, edges, mu, and sigma -- user is not able to override 
seed or specify that seed should be randomly generated.
* Backwards-incompatible change to logNormalGraph signature with introduction 
of new required parameter.
* Failed to update scala docs and programming guide for API changes
* Added a Synthetic Benchmark in the examples.

This PR:
* Removes the in-line calls and calls original vertex / edge generation 
functions again
* Adds an optional seed parameter for deterministic behavior (when desired)
* Keeps the number of partitions parameter that was added.
* Keeps compatibility with the synthetic benchmark example
* Maintains backwards-compatible API

Author: RJ Nowling <rnowl...@gmail.com>
Author: Ankur Dave <ankurd...@gmail.com>

Closes #2168 from rnowling/graphgenrand and squashes the following commits:

f1cd79f [Ankur Dave] Style fixes
e11918e [RJ Nowling] Fix bad comparisons in unit tests
785ac70 [RJ Nowling] Fix style error
c70868d [RJ Nowling] Fix logNormalGraph scala doc for seed
41fd1f8 [RJ Nowling] Fix logNormalGraph scala doc for seed
799f002 [RJ Nowling] Added test for different seeds for sampleLogNormal
43949ad [RJ Nowling] Added test for different seeds for generateRandomEdges
2faf75f [RJ Nowling] Added unit test for logNormalGraph
82f22397 [RJ Nowling] Add unit test for sampleLogNormal
b99cba9 [RJ Nowling] Make sampleLogNormal private to Spark (vs private) for 
unit testing
6803da1 [RJ Nowling] Add GraphGeneratorsSuite with test for generateRandomEdges
1c8fc44 [RJ Nowling] Connected components part of SynthBenchmark was failing to 
call count on RDD before printing
dfbb6dd [RJ Nowling] Fix parameter name in SynthBenchmark docs
b5eeb80 [RJ Nowling] Add optional seed parameter to SynthBenchmark and set 
default to randomly generate a seed
1ff8d30 [RJ Nowling] Fix bug in generateRandomEdges where numVertices instead 
of numEdges was used to control number of edges to generate
98bb73c [RJ Nowling] Add documentation for logNormalGraph parameters
d40141a [RJ Nowling] Fix style error
684804d [RJ Nowling] revert PR #720 which introduce errors in logNormalGraph 
and messed up seeding of RNGs.  Add user-defined optional seed for 
deterministic behavior
c183136 [RJ Nowling] Fix to deterministic GraphGenerators.logNormalGraph that 
allows generating graphs randomly using optional seed.
015010c [RJ Nowling] Fixed GraphGenerator logNormalGraph API to make 
backward-incompatible change in commit 894ecde04


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5d37680
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5d37680
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5d37680

Branch: refs/heads/master
Commit: e5d376801d57dffb0791980a1786a0a9b45bc491
Parents: 6481d27
Author: RJ Nowling <rnowl...@gmail.com>
Authored: Wed Sep 3 14:15:22 2014 -0700
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Wed Sep 3 14:16:06 2014 -0700

----------------------------------------------------------------------
 .../spark/examples/graphx/SynthBenchmark.scala  |   9 +-
 .../spark/graphx/util/GraphGenerators.scala     |  65 ++++++-----
 .../graphx/util/GraphGeneratorsSuite.scala      | 110 +++++++++++++++++++
 3 files changed, 152 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e5d37680/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala 
b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
index 551c339..5f35a58 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala
@@ -38,12 +38,13 @@ object SynthBenchmark {
    * Options:
    *   -app "pagerank" or "cc" for pagerank or connected components. (Default: 
pagerank)
    *   -niters the number of iterations of pagerank to use (Default: 10)
-   *   -numVertices the number of vertices in the graph (Default: 1000000)
+   *   -nverts the number of vertices in the graph (Default: 1000000)
    *   -numEPart the number of edge partitions in the graph (Default: number 
of cores)
    *   -partStrategy the graph partitioning strategy to use
    *   -mu the mean parameter for the log-normal graph (Default: 4.0)
    *   -sigma the stdev parameter for the log-normal graph (Default: 1.3)
    *   -degFile the local file to save the degree information (Default: Empty)
+   *   -seed seed to use for RNGs (Default: -1, picks seed randomly)
    */
   def main(args: Array[String]) {
     val options = args.map {
@@ -62,6 +63,7 @@ object SynthBenchmark {
     var mu: Double = 4.0
     var sigma: Double = 1.3
     var degFile: String = ""
+    var seed: Int = -1
 
     options.foreach {
       case ("app", v) => app = v
@@ -72,6 +74,7 @@ object SynthBenchmark {
       case ("mu", v) => mu = v.toDouble
       case ("sigma", v) => sigma = v.toDouble
       case ("degFile", v) => degFile = v
+      case ("seed", v) => seed = v.toInt
       case (opt, _) => throw new IllegalArgumentException("Invalid option: " + 
opt)
     }
 
@@ -85,7 +88,7 @@ object SynthBenchmark {
     // Create the graph
     println(s"Creating graph...")
     val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
-      numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
+      numEPart.getOrElse(sc.defaultParallelism), mu, sigma, seed)
     // Repartition the graph
     val graph = 
partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
 
@@ -113,7 +116,7 @@ object SynthBenchmark {
       println(s"Total PageRank = $totalPR")
     } else if (app == "cc") {
       println("Running Connected Components")
-      val numComponents = 
graph.connectedComponents.vertices.map(_._2).distinct()
+      val numComponents = 
graph.connectedComponents.vertices.map(_._2).distinct().count()
       println(s"Number of components = $numComponents")
     }
     val runTime = System.currentTimeMillis() - startTime

http://git-wip-us.apache.org/repos/asf/spark/blob/e5d37680/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
index 6014954..b830928 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala
@@ -40,7 +40,7 @@ object GraphGenerators {
   val RMATd = 0.25
 
   /**
-   * Generate a graph whose vertex out degree is log normal.
+   * Generate a graph whose vertex out degree distribution is log normal.
    *
    * The default values for mu and sigma are taken from the Pregel paper:
    *
@@ -48,33 +48,36 @@ object GraphGenerators {
    * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
    * Pregel: a system for large-scale graph processing. SIGMOD '10.
    *
-   * @param sc
-   * @param numVertices
-   * @param mu
-   * @param sigma
-   * @return
+   * If the seed is -1 (default), a random seed is chosen. Otherwise, use
+   * the user-specified seed.
+   *
+   * @param sc Spark Context
+   * @param numVertices number of vertices in generated graph
+   * @param numEParts (optional) number of partitions
+   * @param mu (optional, default: 4.0) mean of out-degree distribution
+   * @param sigma (optional, default: 1.3) standard deviation of out-degree 
distribution
+   * @param seed (optional, default: -1) seed for RNGs, -1 causes a random 
seed to be chosen
+   * @return Graph object
    */
-  def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
-                     mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] 
= {
-    val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
-      // Initialize the random number generator with the source vertex id
-      val rand = new Random(src)
-      val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * 
sigma + mu).toLong)
-      (src.toLong, degree)
+  def logNormalGraph(
+      sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0,
+      sigma: Double = 1.3, seed: Long = -1): Graph[Long, Int] = {
+
+    val evalNumEParts = if (numEParts == 0) sc.defaultParallelism else 
numEParts
+
+    // Enable deterministic seeding
+    val seedRand = if (seed == -1) new Random() else new Random(seed)
+    val seed1 = seedRand.nextInt()
+    val seed2 = seedRand.nextInt()
+
+    val vertices: RDD[(VertexId, Long)] = sc.parallelize(0 until numVertices, 
evalNumEParts).map {
+      src => (src, sampleLogNormal(mu, sigma, numVertices, seed = (seed1 ^ 
src)))
     }
+
     val edges = vertices.flatMap { case (src, degree) =>
-      new Iterator[Edge[Int]] {
-        // Initialize the random number generator with the source vertex id
-        val rand = new Random(src)
-        var i = 0
-        override def hasNext(): Boolean = { i < degree }
-        override def next(): Edge[Int] = {
-          val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
-          i += 1
-          nextEdge
-        }
-      }
+      generateRandomEdges(src.toInt, degree.toInt, numVertices, seed = (seed2 
^ src))
     }
+
     Graph(vertices, edges, 0)
   }
 
@@ -82,9 +85,10 @@ object GraphGenerators {
   // the edge data is the weight (default 1)
   val RMATc = 0.15
 
-  def generateRandomEdges(src: Int, numEdges: Int, maxVertexId: Int): 
Array[Edge[Int]] = {
-    val rand = new Random()
-    Array.fill(maxVertexId) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
+  def generateRandomEdges(
+      src: Int, numEdges: Int, maxVertexId: Int, seed: Long = -1): 
Array[Edge[Int]] = {
+    val rand = if (seed == -1) new Random() else new Random(seed)
+    Array.fill(numEdges) { Edge[Int](src, rand.nextInt(maxVertexId), 1) }
   }
 
   /**
@@ -97,9 +101,12 @@ object GraphGenerators {
    * @param mu the mean of the normal distribution
    * @param sigma the standard deviation of the normal distribution
    * @param maxVal exclusive upper bound on the value of the sample
+   * @param seed optional seed
    */
-  private def sampleLogNormal(mu: Double, sigma: Double, maxVal: Int): Int = {
-    val rand = new Random()
+  private[spark] def sampleLogNormal(
+      mu: Double, sigma: Double, maxVal: Int, seed: Long = -1): Int = {
+    val rand = if (seed == -1) new Random() else new Random(seed)
+
     val sigmaSq = sigma * sigma
     val m = math.exp(mu + sigmaSq / 2.0)
     // expm1 is exp(m)-1 with better accuracy for tiny m

http://git-wip-us.apache.org/repos/asf/spark/blob/e5d37680/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
new file mode 100644
index 0000000..b346d4d
--- /dev/null
+++ 
b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.util
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx.LocalSparkContext
+
+class GraphGeneratorsSuite extends FunSuite with LocalSparkContext {
+
+  test("GraphGenerators.generateRandomEdges") {
+    val src = 5
+    val numEdges10 = 10
+    val numEdges20 = 20
+    val maxVertexId = 100
+
+    val edges10 = GraphGenerators.generateRandomEdges(src, numEdges10, 
maxVertexId)
+    assert(edges10.length == numEdges10)
+
+    val correctSrc = edges10.forall(e => e.srcId == src)
+    assert(correctSrc)
+
+    val correctWeight = edges10.forall(e => e.attr == 1)
+    assert(correctWeight)
+
+    val correctRange = edges10.forall(e => e.dstId >= 0 && e.dstId <= 
maxVertexId)
+    assert(correctRange)
+
+    val edges20 = GraphGenerators.generateRandomEdges(src, numEdges20, 
maxVertexId)
+    assert(edges20.length == numEdges20)
+
+    val edges10_round1 =
+      GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 
12345)
+    val edges10_round2 =
+      GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 
12345)
+    assert(edges10_round1.zip(edges10_round2).forall { case (e1, e2) =>
+      e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+    })
+
+    val edges10_round3 =
+      GraphGenerators.generateRandomEdges(src, numEdges10, maxVertexId, seed = 
3467)
+    assert(!edges10_round1.zip(edges10_round3).forall { case (e1, e2) =>
+      e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+    })
+  }
+
+  test("GraphGenerators.sampleLogNormal") {
+    val mu = 4.0
+    val sigma = 1.3
+    val maxVal = 100
+
+    val dstId = GraphGenerators.sampleLogNormal(mu, sigma, maxVal)
+    assert(dstId < maxVal)
+
+    val dstId_round1 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 
12345)
+    val dstId_round2 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 
12345)
+    assert(dstId_round1 == dstId_round2)
+
+    val dstId_round3 = GraphGenerators.sampleLogNormal(mu, sigma, maxVal, 789)
+    assert(dstId_round1 != dstId_round3)
+  }
+
+  test("GraphGenerators.logNormalGraph") {
+    withSpark { sc =>
+      val mu = 4.0
+      val sigma = 1.3
+      val numVertices100 = 100
+
+      val graph = GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, 
sigma = sigma)
+      assert(graph.vertices.count() == numVertices100)
+
+      val graph_round1 =
+        GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = 
sigma, seed = 12345)
+      val graph_round2 =
+        GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = 
sigma, seed = 12345)
+
+      val graph_round1_edges = graph_round1.edges.collect()
+      val graph_round2_edges = graph_round2.edges.collect()
+
+      assert(graph_round1_edges.zip(graph_round2_edges).forall { case (e1, e2) 
=>
+        e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+      })
+
+      val graph_round3 =
+        GraphGenerators.logNormalGraph(sc, numVertices100, mu = mu, sigma = 
sigma, seed = 567)
+
+      val graph_round3_edges = graph_round3.edges.collect()
+
+      assert(!graph_round1_edges.zip(graph_round3_edges).forall { case (e1, 
e2) =>
+        e1.srcId == e2.srcId && e1.dstId == e2.dstId && e1.attr == e2.attr
+      })
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to