Repository: spark Updated Branches: refs/heads/branch-1.1 abdb90bd7 -> 1b282cdfd
[SPARK-4115][GraphX] Add overrided count for edge counting of EdgeRDD. Accumulate sizes of all the EdgePartitions just like the VertexRDD. Author: luluorta <[email protected]> Closes #2975 from luluorta/graph-edge-count and squashes the following commits: 86ef0e5 [luluorta] Add overrided count for edge counting of EdgeRDD. (cherry picked from commit ee29ef3800438501e0ff207feb00a28973fc0769) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b282cdf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b282cdf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b282cdf Branch: refs/heads/branch-1.1 Commit: 1b282cdfda13e057b9cd85e1d71847d366fe7fcb Parents: abdb90b Author: luluorta <[email protected]> Authored: Sat Nov 1 01:22:46 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat Nov 1 01:23:00 2014 -0700 ---------------------------------------------------------------------- graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1b282cdf/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 899a3cb..65c2b09 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -74,12 +74,17 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( this } - /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + /** Persists the edge partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ override def cache(): this.type = { partitionsRDD.persist(targetStorageLevel) this } + /** The number of edges in the RDD. */ + override def count(): Long = { + partitionsRDD.map(_._2.size.toLong).reduce(_ + _) + } + private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
