[ 
https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488609#comment-15488609
 ] 

ding commented on SPARK-17097:
------------------------------

I am afraid the attached sample code fail to terminate with case class is not 
caused by Pregel or graphx bug. As in the sample code, vertices(inLabels here) 
is updated inplace which supposed not to happen. In this way, after transform 
operations, the original vertices is also updated and it has exactly the same 
value with the new generated vertices in above code when VD is case class. It 
leads to fail to update EdgeTriplet as there is no difference of the vertices. 
So EdgeTriplet dstLabels is always empty while srcLabels contains a value. And 
there is always active message which lead to Pregel not terminate.

One way to fix the problem is remove inplace update in vertexProgram by clone 
the labels and make update in the new labels. I have tried below code and it 
works.
    def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
Vector[String]): TestVertex = {
      val labels = vdata.labels.clone() 
      message.foreach {
        case `startString` =>
          if (vdata.id == 0L)
            labels.add(vdata.value )  

        case m =>
          if (!vdata.labels.contains(m))
            labels.add(m)
      }
      new TestVertex(vdata.id, vdata.value, labels)
    }

Hope this information is helpful to you.

> Pregel does not keep vertex state properly; fails to terminate 
> ---------------------------------------------------------------
>
>                 Key: SPARK-17097
>                 URL: https://issues.apache.org/jira/browse/SPARK-17097
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.6.0
>         Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
>            Reporter: Seth Bromberger
>
> Consider the following minimum example:
> {code:title=PregelBug.scala|borderStyle=solid}
> package testGraph
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}
> object PregelBug {
>   def main(args: Array[String]) = {
>     //FIXME breaks if TestVertex is a case class; works if not case class
>     case class TestVertex(inId: VertexId,
>                      inData: String,
>                      inLabels: collection.mutable.HashSet[String]) extends 
> Serializable {
>       val id = inId
>       val value = inData
>       val labels = inLabels
>     }
>     class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends 
> Serializable  {
>       val src = inSrc
>       val dst = inDst
>       val data = inData
>     }
>     val startString = "XXXSTARTXXX"
>     val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
>     val sc = new SparkContext(conf)
>     val vertexes = Vector(
>       new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
>       new TestVertex(1, "label1", collection.mutable.HashSet[String]())
>     )
>     val links = Vector(
>       new TestLink(0, 1, "linkData01")
>     )
>     val vertexes_packaged = vertexes.map(v => (v.id, v))
>     val links_packaged = links.map(e => Edge(e.src, e.dst, e))
>     val graph = Graph[TestVertex, 
> TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged))
>     def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: 
> Vector[String]): TestVertex = {
>       message.foreach {
>         case `startString` =>
>           if (vdata.id == 0L)
>             vdata.labels.add(vdata.value)
>         case m =>
>           if (!vdata.labels.contains(m))
>             vdata.labels.add(m)
>       }
>       new TestVertex(vdata.id, vdata.value, vdata.labels)
>     }
>     def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): 
> Iterator[(VertexId, Vector[String])] = {
>       val srcLabels = triplet.srcAttr.labels
>       val dstLabels = triplet.dstAttr.labels
>       val msgsSrcDst = srcLabels.diff(dstLabels)
>         .map(label => (triplet.dstAttr.id, Vector[String](label)))
>       val msgsDstSrc = dstLabels.diff(dstLabels)
>         .map(label => (triplet.srcAttr.id, Vector[String](label)))
>       msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
>     }
>     def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] 
> = m1.union(m2).distinct
>     val g = graph.pregel(Vector[String](startString))(vertexProgram, 
> sendMessage, mergeMessage)
>     println("---pregel done---")
>     println("vertex info:")
>     g.vertices.foreach(
>       v => {
>         val labels = v._2.labels
>         println(
>           "vertex " + v._1 +
>             ": name = " + v._2.id +
>             ", labels = " + labels)
>       }
>     )
>   }
> }
> {code}
> This code never terminates even though we expect it to. To fix, we simply 
> remove the "case" designation for the TestVertex class (see FIXME comment), 
> and then it behaves as expected.
> (Apologies if this has been fixed in later versions; we're unfortunately 
> pegged to 2.10.5 / 1.6.0 for now.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to