This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2b5e033  [SPARK-26757][GRAPHX] Return 0 for `count` on empty 
Edge/Vertex RDDs
2b5e033 is described below

commit 2b5e033eb937a8074e454e1995616f8a1bf370f8
Author: Huon Wilson <huon.wil...@data61.csiro.au>
AuthorDate: Thu Jan 31 17:27:11 2019 -0600

    [SPARK-26757][GRAPHX] Return 0 for `count` on empty Edge/Vertex RDDs
    
    ## What changes were proposed in this pull request?
    
    Previously a "java.lang.UnsupportedOperationException: empty
    collection" exception would be thrown due to using `reduce`, rather
    than `fold` or similar that can tolerate empty RDDs.
    
    This behaviour has existed for the Vertex RDDs since it was introduced
    in b30e0ae0351be1cbc0b1cf179293587b466ee026. It seems this behaviour
    was inherited by the Edge RDDs via copy-paste in
    ee29ef3800438501e0ff207feb00a28973fc0769.
    
    ## How was this patch tested?
    
    Two new unit tests.
    
    Closes #23681 from huonw/empty-graphx.
    
    Authored-by: Huon Wilson <huon.wil...@data61.csiro.au>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
    (cherry picked from commit da526985c7574dccdcc0cca7452e2e999a5b3012)
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala |  2 +-
 .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala    |  2 +-
 .../main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala  |  2 +-
 .../src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala | 10 ++++++++++
 .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala   | 11 +++++++++++
 .../scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala  |  9 +++++++++
 6 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
index 376c7b0..eb8abd1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala
@@ -87,7 +87,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] 
(
 
   /** The number of edges in the RDD. */
   override def count(): Long = {
-    partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
+    partitionsRDD.map(_._2.size.toLong).fold(0)(_ + _)
   }
 
   override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, 
VD] =
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 3c6f22d..2da9762 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -87,7 +87,7 @@ class VertexRDDImpl[VD] private[graphx] (
 
   /** The number of vertices in the RDD. */
   override def count(): Long = {
-    partitionsRDD.map(_.size.toLong).reduce(_ + _)
+    partitionsRDD.map(_.size.toLong).fold(0)(_ + _)
   }
 
   override private[graphx] def mapVertexPartitions[VD2: ClassTag](
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 59fdd85..2847a4e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -72,7 +72,7 @@ object SVDPlusPlus {
 
     // calculate global rating mean
     edges.cache()
-    val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, 
a._2 + b._2))
+    val (rs, rc) = edges.map(e => (e.attr, 1L)).fold((0, 0))((a, b) => (a._1 + 
b._1, a._2 + b._2))
     val u = rs / rc
 
     // construct graph
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
index 7a24e32..8fd3e6f 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala
@@ -60,4 +60,14 @@ class EdgeRDDSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("count") {
+    withSpark { sc =>
+      val empty = EdgeRDD.fromEdges(sc.emptyRDD[Edge[Int]])
+      assert(empty.count === 0)
+
+      val edges = List(Edge(0, 1, ()), Edge(1, 2, ()), Edge(2, 0, ()))
+      val nonempty = EdgeRDD.fromEdges(sc.parallelize(edges))
+      assert(nonempty.count === edges.size)
+    }
+  }
 }
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 8e63043..434e6a8 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -223,4 +223,15 @@ class VertexRDDSuite extends SparkFunSuite with 
LocalSparkContext {
       assert(verts.collect().toSeq === data) // test checkpointed RDD
     }
   }
+
+  test("count") {
+    withSpark { sc =>
+      val empty = VertexRDD(sc.emptyRDD[(Long, Unit)])
+      assert(empty.count === 0)
+
+      val n = 100
+      val nonempty = vertices(sc, n)
+      assert(nonempty.count === n + 1)
+    }
+  }
 }
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
index 2991438..da0457c 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
@@ -40,4 +40,13 @@ class SVDPlusPlusSuite extends SparkFunSuite with 
LocalSparkContext {
     }
   }
 
+  test("Test SVD++ with no edges") {
+    withSpark { sc =>
+      val edges = sc.emptyRDD[Edge[Double]]
+      val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 
0.015) // 2 iterations
+      val (graph, _) = SVDPlusPlus.run(edges, conf)
+      assert(graph.vertices.count == 0)
+      assert(graph.edges.count == 0)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to