[ 
https://issues.apache.org/jira/browse/SPARK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MiKou updated SPARK-34517:
--------------------------
    Description: 
Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when 
implementing an iterative graph-algorithm which is very similar to 
'Graph.staticParallelPersonalizedPageRank'.

 

There will be a bug if 'outerJoinVertices' does not change the data-type of 
vertices. More specifically, 'aggregateMessages' gives the same results for 
some iteration, while the value of vertices was still changed and the attribute 
of a message is fully depend on the value of its source vertex.

 

I temporarily solve the problem by changing the data-type when using 
'outerJoinVertices' and then change it back with 'mapVertices'. It showed that 
the problem is probably due to the incorrect updating of activeSet when 
'outerJoinVertices' keeps the data-type of vertices.

 

Example code:
{code:java}
  val prevRankGraph = rankGraph

  val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]](
    ctx => {
      println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2)
      ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr)
    },
    (a : Vector[Double], b : Vector[Double]) => a +:+ b,
    TripletFields.Src
  )

  rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, 
rsd), msgSumOpt) =>
    println(vid.nodeId, " value: " + rsd.toString())
    (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha))
  }.cache()

{code}
 

Example graph:
{code:java}
0 6
1 0
1 5
1 6
2 0
2 7
3 2
3 6
4 7
5 1
6 8
7 5
7 9
8 0
9 6
{code}
 

Log of Executor1:
{code:java}
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(5, pushing value: SparseVector(10)())
(1, value: SparseVector(10)())
(2, value: SparseVector(10)())
(3, value: SparseVector(10)((3,1.0)))
(8, value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(5, pushing value: SparseVector(10)())
(1, value: SparseVector(10)())
(2, value: SparseVector(10)((3,0.4)))
#On the vertex view, node-3 has still no value in the second round.#
(3, value: SparseVector(10)())
(8, value: SparseVector(10)())
{code}
Log of Executor2:
{code:java}
(3, pushing value: SparseVector(10)((3,1.0)))
(3, pushing value: SparseVector(10)((3,1.0)))
(7, pushing value: SparseVector(10)())
(7, pushing value: SparseVector(10)())
(9, pushing value: SparseVector(10)())
(9, value: SparseVector(10)())
(0, value: SparseVector(10)())
#On the edge view, the value of node-3 does not change in the second round.#
(3, pushing value: SparseVector(10)((3,1.0)))
(3, pushing value: SparseVector(10)((3,1.0)))
(7, pushing value: SparseVector(10)())
(7, pushing value: SparseVector(10)())
(9, pushing value: SparseVector(10)())
(9, value: SparseVector(10)())
(0, value: SparseVector(10)())
(3, pushing value: SparseVector(10)())
(3, pushing value: SparseVector(10)())
(7, pushing value: SparseVector(10)((3,0.16000000000000003)))
(7, pushing value: SparseVector(10)((3,0.16000000000000003)))
(9, pushing value: SparseVector(10)())
(6, value: SparseVector(10)()){code}
Log of Executor3:

 
{code:java}
(0, pushing value: SparseVector(10)())
(2, pushing value: SparseVector(10)())
(2, pushing value: SparseVector(10)())
(9, value: SparseVector(10)())
(0, value: SparseVector(10)())
(0, pushing value: SparseVector(10)())
#The value of node-2 is updated correctly on the edge view.#
(2, pushing value: SparseVector(10)((3,0.4)))
(2, pushing value: SparseVector(10)((3,0.4)))
(9, value: SparseVector(10)())
(0, value: SparseVector(10)())
(0, pushing value: SparseVector(10)((3,0.16000000000000003)))
#The value of node-2 also updated correctly in the third round.#
(2, pushing value: SparseVector(10)())
(2, pushing value: SparseVector(10)())
(9, value: SparseVector(10)())
(0, value: SparseVector(10)((3,0.16000000000000003)))
{code}
Log of Executor4:

 
{code:java}
(4, pushing value: SparseVector(10)())
(6, pushing value: SparseVector(10)())
(8, pushing value: SparseVector(10)())
(4, value: SparseVector(10)())
(7, value: SparseVector(10)())
(5, value: SparseVector(10)())
(4, pushing value: SparseVector(10)())
#The value of node-6 is updated correctly on the edge view.#
(6, pushing value: SparseVector(10)((3,0.4)))
(8, pushing value: SparseVector(10)())
(4, value: SparseVector(10)())
(7, value: SparseVector(10)())
(5, value: SparseVector(10)())
(4, pushing value: SparseVector(10)())
#The value of node-6 also updated correctly in the third round.#
(6, pushing value: SparseVector(10)())
(8, pushing value: SparseVector(10)((3,0.32000000000000006)))
(4, value: SparseVector(10)())
(7, value: SparseVector(10)((3,0.16000000000000003)))
(5, value: SparseVector(10)())
{code}

  was:
Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when 
implementing an iterative graph-algorithm which is very similar to 
'Graph.staticParallelPersonalizedPageRank'.

 

There will be a bug if 'outerJoinVertices' does not change the data-type of 
vertices. More specifically, 'aggregateMessages' gives the same results for 
some iteration, while the value of vertices was still changed and the attribute 
of a message is fully depend on the value of its source vertex.

 

I temporarily solve the problem by changing the data-type when using 
'outerJoinVertices' and then change it back with 'mapVertices'. It showed that 
the problem is probably due to the incorrect updating of activeSet when 
'outerJoinVertices' keeps the data-type of vertices.

 

Example code:
{code:java}
  val prevRankGraph = rankGraph

  val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]](
    ctx => {
      println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2)
      ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr)
    },
    (a : Vector[Double], b : Vector[Double]) => a +:+ b,
    TripletFields.Src
  )

  rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, 
rsd), msgSumOpt) =>
    println(vid.nodeId, " value: " + rsd.toString())
    (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha))
  }.cache()

{code}
 

Example graph:
{code:java}
0 6
1 0
1 5
1 6
2 0
2 7
3 2
3 6
4 7
5 1
6 8
7 5
7 9
8 0
9 6
{code}
 

Log of Executor1:
{code:java}
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(5, pushing value: SparseVector(10)())
(1, value: SparseVector(10)())
(2, value: SparseVector(10)())
(3, value: SparseVector(10)((3,1.0)))
(8, value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(1, pushing value: SparseVector(10)())
(5, pushing value: SparseVector(10)())
(1, value: SparseVector(10)())
(2, value: SparseVector(10)((3,0.4)))
#On the vertex view, node-3 has still no value in the second round.#
(3, value: SparseVector(10)())
(8, value: SparseVector(10)())
{code}
Log of Executor2:
{code:java}
(3, pushing value: SparseVector(10)((3,1.0)))
(3, pushing value: SparseVector(10)((3,1.0)))
(7, pushing value: SparseVector(10)())
(7, pushing value: SparseVector(10)())
(9, pushing value: SparseVector(10)())
(9, value: SparseVector(10)())
(0, value: SparseVector(10)())
#On the edge view, the value of node-3 does not change in the second round.#
(3, pushing value: SparseVector(10)((3,1.0)))
(3, pushing value: SparseVector(10)((3,1.0)))
(7, pushing value: SparseVector(10)())
(7, pushing value: SparseVector(10)())
(9, pushing value: SparseVector(10)())
(9, value: SparseVector(10)())
(0, value: SparseVector(10)()){code}
 


> Unexpected result when using 'aggregateMessages' with 'outerJoinVertices'
> -------------------------------------------------------------------------
>
>                 Key: SPARK-34517
>                 URL: https://issues.apache.org/jira/browse/SPARK-34517
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 3.0.2
>         Environment: OpenJDK 1.8.0_275
> Scala 2.12.10
> Hadoop 3.3.0
> Spark 3.0.2
>            Reporter: MiKou
>            Priority: Critical
>
> Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when 
> implementing an iterative graph-algorithm which is very similar to 
> 'Graph.staticParallelPersonalizedPageRank'.
>  
> There will be a bug if 'outerJoinVertices' does not change the data-type of 
> vertices. More specifically, 'aggregateMessages' gives the same results for 
> some iteration, while the value of vertices was still changed and the 
> attribute of a message is fully depend on the value of its source vertex.
>  
> I temporarily solve the problem by changing the data-type when using 
> 'outerJoinVertices' and then change it back with 'mapVertices'. It showed 
> that the problem is probably due to the incorrect updating of activeSet when 
> 'outerJoinVertices' keeps the data-type of vertices.
>  
> Example code:
> {code:java}
>   val prevRankGraph = rankGraph
>   val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]](
>     ctx => {
>       println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2)
>       ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr)
>     },
>     (a : Vector[Double], b : Vector[Double]) => a +:+ b,
>     TripletFields.Src
>   )
>   rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, 
> rsd), msgSumOpt) =>
>     println(vid.nodeId, " value: " + rsd.toString())
>     (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha))
>   }.cache()
> {code}
>  
> Example graph:
> {code:java}
> 0 6
> 1 0
> 1 5
> 1 6
> 2 0
> 2 7
> 3 2
> 3 6
> 4 7
> 5 1
> 6 8
> 7 5
> 7 9
> 8 0
> 9 6
> {code}
>  
> Log of Executor1:
> {code:java}
> (1, pushing value: SparseVector(10)())
> (1, pushing value: SparseVector(10)())
> (1, pushing value: SparseVector(10)())
> (5, pushing value: SparseVector(10)())
> (1, value: SparseVector(10)())
> (2, value: SparseVector(10)())
> (3, value: SparseVector(10)((3,1.0)))
> (8, value: SparseVector(10)())
> (1, pushing value: SparseVector(10)())
> (1, pushing value: SparseVector(10)())
> (1, pushing value: SparseVector(10)())
> (5, pushing value: SparseVector(10)())
> (1, value: SparseVector(10)())
> (2, value: SparseVector(10)((3,0.4)))
> #On the vertex view, node-3 has still no value in the second round.#
> (3, value: SparseVector(10)())
> (8, value: SparseVector(10)())
> {code}
> Log of Executor2:
> {code:java}
> (3, pushing value: SparseVector(10)((3,1.0)))
> (3, pushing value: SparseVector(10)((3,1.0)))
> (7, pushing value: SparseVector(10)())
> (7, pushing value: SparseVector(10)())
> (9, pushing value: SparseVector(10)())
> (9, value: SparseVector(10)())
> (0, value: SparseVector(10)())
> #On the edge view, the value of node-3 does not change in the second round.#
> (3, pushing value: SparseVector(10)((3,1.0)))
> (3, pushing value: SparseVector(10)((3,1.0)))
> (7, pushing value: SparseVector(10)())
> (7, pushing value: SparseVector(10)())
> (9, pushing value: SparseVector(10)())
> (9, value: SparseVector(10)())
> (0, value: SparseVector(10)())
> (3, pushing value: SparseVector(10)())
> (3, pushing value: SparseVector(10)())
> (7, pushing value: SparseVector(10)((3,0.16000000000000003)))
> (7, pushing value: SparseVector(10)((3,0.16000000000000003)))
> (9, pushing value: SparseVector(10)())
> (6, value: SparseVector(10)()){code}
> Log of Executor3:
>  
> {code:java}
> (0, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (9, value: SparseVector(10)())
> (0, value: SparseVector(10)())
> (0, pushing value: SparseVector(10)())
> #The value of node-2 is updated correctly on the edge view.#
> (2, pushing value: SparseVector(10)((3,0.4)))
> (2, pushing value: SparseVector(10)((3,0.4)))
> (9, value: SparseVector(10)())
> (0, value: SparseVector(10)())
> (0, pushing value: SparseVector(10)((3,0.16000000000000003)))
> #The value of node-2 also updated correctly in the third round.#
> (2, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (9, value: SparseVector(10)())
> (0, value: SparseVector(10)((3,0.16000000000000003)))
> {code}
> Log of Executor4:
>  
> {code:java}
> (4, pushing value: SparseVector(10)())
> (6, pushing value: SparseVector(10)())
> (8, pushing value: SparseVector(10)())
> (4, value: SparseVector(10)())
> (7, value: SparseVector(10)())
> (5, value: SparseVector(10)())
> (4, pushing value: SparseVector(10)())
> #The value of node-6 is updated correctly on the edge view.#
> (6, pushing value: SparseVector(10)((3,0.4)))
> (8, pushing value: SparseVector(10)())
> (4, value: SparseVector(10)())
> (7, value: SparseVector(10)())
> (5, value: SparseVector(10)())
> (4, pushing value: SparseVector(10)())
> #The value of node-6 also updated correctly in the third round.#
> (6, pushing value: SparseVector(10)())
> (8, pushing value: SparseVector(10)((3,0.32000000000000006)))
> (4, value: SparseVector(10)())
> (7, value: SparseVector(10)((3,0.16000000000000003)))
> (5, value: SparseVector(10)())
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to