[ 
https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533976#comment-15533976
 ] 

ding commented on SPARK-17097:
------------------------------

Because diff of case class behaves different with regular class. A case class 
implements the equals method while a class does not. When comparing two objects 
implemented as a class is actually comparing the memory address of the objects. 
In above code, if we remove "case",  after transform, the original vertices is 
still different with the new generated vertices although they have the same 
value. In this way, EdgeTriplet is able to be updated since there is difference 
and after 1 iteration there will be no active message and the application will 
terminate. 

> Pregel does not keep vertex state properly; fails to terminate 
> ---------------------------------------------------------------
>
>                 Key: SPARK-17097
>                 URL: https://issues.apache.org/jira/browse/SPARK-17097
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.6.0
>         Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
>            Reporter: Seth Bromberger
>
> Consider the following minimum example:
> {code:title=PregelBug.scala|borderStyle=solid}
> package testGraph
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}
> object PregelBug {
>   def main(args: Array[String]) = {
>     //FIXME breaks if TestVertex is a case class; works if not case class
>     case class TestVertex(inId: VertexId,
>                      inData: String,
>                      inLabels: collection.mutable.HashSet[String]) extends 
> Serializable {
>       val id = inId
>       val value = inData
>       val labels = inLabels
>     }
>     class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends 
> Serializable  {
>       val src = inSrc
>       val dst = inDst
>       val data = inData
>     }
>     val startString = "XXXSTARTXXX"
>     val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
>     val sc = new SparkContext(conf)
>     val vertexes = Vector(
>       new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
>       new TestVertex(1, "label1", collection.mutable.HashSet[String]())
>     )
>     val links = Vector(
>       new TestLink(0, 1, "linkData01")
>     )
>     val vertexes_packaged = vertexes.map(v => (v.id, v))
>     val links_packaged = links.map(e => Edge(e.src, e.dst, e))
>     val graph = Graph[TestVertex, 
> TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged))
>     def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
> Vector[String]): TestVertex = {
>       message.foreach {
>         case `startString` =>
>           if (vdata.id == 0L)
>             vdata.labels.add(vdata.value)
>         case m =>
>           if (!vdata.labels.contains(m))
>             vdata.labels.add(m)
>       }
>       new TestVertex(vdata.id, vdata.value, vdata.labels)
>     }
>     def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): 
> Iterator[(VertexId, Vector[String])] = {
>       val srcLabels = triplet.srcAttr.labels
>       val dstLabels = triplet.dstAttr.labels
>       val msgsSrcDst = srcLabels.diff(dstLabels)
>         .map(label => (triplet.dstAttr.id, Vector[String](label)))
>       val msgsDstSrc = dstLabels.diff(dstLabels)
>         .map(label => (triplet.srcAttr.id, Vector[String](label)))
>       msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
>     }
>     def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] 
> = m1.union(m2).distinct
>     val g = graph.pregel(Vector[String](startString))(vertexProgram, 
> sendMessage, mergeMessage)
>     println("---pregel done---")
>     println("vertex info:")
>     g.vertices.foreach(
>       v => {
>         val labels = v._2.labels
>         println(
>           "vertex " + v._1 +
>             ": name = " + v._2.id +
>             ", labels = " + labels)
>       }
>     )
>   }
> }
> {code}
> This code never terminates even though we expect it to. To fix, we simply 
> remove the "case" designation for the TestVertex class (see FIXME comment), 
> and then it behaves as expected.
> (Apologies if this has been fixed in later versions; we're unfortunately 
> pegged to 2.10.5 / 1.6.0 for now.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to