hi, all where build a graph from edge tuples with api Graph.fromEdgeTuples, the edges object type is RDD[Edge], inside of EdgeRDD.fromEdge, EdgePartitionBuilder.add func’s param is better to be Edge object. no need to create a new Edge object again.
def fromEdgeTuples[VD: ClassTag]( rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] = { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel) uniqueEdges match { case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b) case None => graph } } object GraphImpl { /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */ def apply[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], defaultVertexAttr: VD, edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel) } object EdgeRDD { /** * Creates an EdgeRDD from a set of edges. * * @tparam ED the edge attribute type * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD */ def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => val builder = new EdgePartitionBuilder[ED, VD] iter.foreach { e => builder.add(e.srcId, e.dstId, e.attr) } Iterator((pid, builder.toEdgePartition)) } EdgeRDD.fromEdgePartitions(edgePartitions) }
smime.p7s
Description: S/MIME cryptographic signature