[ https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533976#comment-15533976 ]
ding commented on SPARK-17097: ------------------------------ Because diff of case class behaves different with regular class. A case class implements the equals method while a class does not. When comparing two objects implemented as a class is actually comparing the memory address of the objects. In above code, if we remove "case", after transform, the original vertices is still different with the new generated vertices although they have the same value. In this way, EdgeTriplet is able to be updated since there is difference and after 1 iteration there will be no active message and the application will terminate. > Pregel does not keep vertex state properly; fails to terminate > --------------------------------------------------------------- > > Key: SPARK-17097 > URL: https://issues.apache.org/jira/browse/SPARK-17097 > Project: Spark > Issue Type: Bug > Components: GraphX > Affects Versions: 1.6.0 > Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel > Reporter: Seth Bromberger > > Consider the following minimum example: > {code:title=PregelBug.scala|borderStyle=solid} > package testGraph > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _} > object PregelBug { > def main(args: Array[String]) = { > //FIXME breaks if TestVertex is a case class; works if not case class > case class TestVertex(inId: VertexId, > inData: String, > inLabels: collection.mutable.HashSet[String]) extends > Serializable { > val id = inId > val value = inData > val labels = inLabels > } > class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends > Serializable { > val src = inSrc > val dst = inDst > val data = inData > } > val startString = "XXXSTARTXXX" > val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]") > val sc = new SparkContext(conf) > val vertexes = Vector( > new TestVertex(0, "label0", collection.mutable.HashSet[String]()), > new TestVertex(1, "label1", collection.mutable.HashSet[String]()) > ) > val links = Vector( > new TestLink(0, 1, "linkData01") > ) > val vertexes_packaged = vertexes.map(v => (v.id, v)) > val links_packaged = links.map(e => Edge(e.src, e.dst, e)) > val graph = Graph[TestVertex, > TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged)) > def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: > Vector[String]): TestVertex = { > message.foreach { > case `startString` => > if (vdata.id == 0L) > vdata.labels.add(vdata.value) > case m => > if (!vdata.labels.contains(m)) > vdata.labels.add(m) > } > new TestVertex(vdata.id, vdata.value, vdata.labels) > } > def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): > Iterator[(VertexId, Vector[String])] = { > val srcLabels = triplet.srcAttr.labels > val dstLabels = triplet.dstAttr.labels > val msgsSrcDst = srcLabels.diff(dstLabels) > .map(label => (triplet.dstAttr.id, Vector[String](label))) > val msgsDstSrc = dstLabels.diff(dstLabels) > .map(label => (triplet.srcAttr.id, Vector[String](label))) > msgsSrcDst.toIterator ++ msgsDstSrc.toIterator > } > def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] > = m1.union(m2).distinct > val g = graph.pregel(Vector[String](startString))(vertexProgram, > sendMessage, mergeMessage) > println("---pregel done---") > println("vertex info:") > g.vertices.foreach( > v => { > val labels = v._2.labels > println( > "vertex " + v._1 + > ": name = " + v._2.id + > ", labels = " + labels) > } > ) > } > } > {code} > This code never terminates even though we expect it to. To fix, we simply > remove the "case" designation for the TestVertex class (see FIXME comment), > and then it behaves as expected. > (Apologies if this has been fixed in later versions; we're unfortunately > pegged to 2.10.5 / 1.6.0 for now.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org