[ 
https://issues.apache.org/jira/browse/SPARK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

MiKou updated SPARK-34517:
--------------------------
    Summary: breeze.linalg.Vector.equals always return true may bring error.  
(was: Unexpected result when using 'aggregateMessages' with 'outerJoinVertices')

> breeze.linalg.Vector.equals always return true may bring error.
> ---------------------------------------------------------------
>
>                 Key: SPARK-34517
>                 URL: https://issues.apache.org/jira/browse/SPARK-34517
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 3.0.2
>         Environment: OpenJDK 1.8.0_275
> Scala 2.12.10
> Hadoop 3.3.0
> Spark 3.0.2
>            Reporter: MiKou
>            Priority: Major
>
> Using 'Graph.aggregateMessages' and 'Graph.outerJoinVertices' together when 
> implementing an iterative graph-algorithm which is very similar to 
> 'Graph.staticParallelPersonalizedPageRank'.
>  
> There will be a bug if 'outerJoinVertices' does not change the data-type of 
> vertices. More specifically, 'aggregateMessages' gives the same results for 
> some iteration, while the value of vertices was still changed and the 
> attribute of a message is fully depend on the value of its source vertex.
> Example code:
> {code:java}
>   val prevRankGraph = rankGraph
>   val rankUpdates = prevRankGraph.aggregateMessages[Vector[Double]](
>     ctx => {
>       println(ctx.srcId.nodeId, " pushing value: " + ctx.srcAttr._2)
>       ctx.sendToDst(ctx.srcAttr._2 *:* ctx.attr)
>     },
>     (a : Vector[Double], b : Vector[Double]) => a +:+ b,
>     TripletFields.Src
>   )
>   rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, (deg, 
> rsd), msgSumOpt) =>
>     println(vid.nodeId, " value: " + rsd.toString())
>     (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha))
>   }.cache()
> {code}
>  
> I temporarily solve the problem by changing the data-type when using 
> 'outerJoinVertices' and then change it back with 'mapVertices'. It showed 
> that the problem is probably due to the incorrect updating of activeSet when 
> 'outerJoinVertices' keeps the data-type of vertices.
> The code work correctly:
> {code:java}
>     rankGraph = prevRankGraph.outerJoinVertices(rankUpdates) { case (vid, 
> (deg, rsd), msgSumOpt) =>
>       println(vid.nodeId, " value: " + rsd.toString())
>       ((), (deg, msgSumOpt.getOrElse(zero) *:* (1.0 - alpha)))
>     }.mapVertices { case (_, (_, data)) => data } .cache()
> {code}
>  
> After test, the bug takes place because breeze.linalg.SparseVector.equals 
> will always return true if two vector have the same dimension
>  
> An example dataset and logs of the issue are give below.
> Example graph:
> {code:java}
> 0 6
> 1 0
> 1 5
> 1 6
> 2 0
> 2 7
> 3 2
> 3 6
> 4 7
> 5 1
> 6 8
> 7 5
> 7 9
> 8 0
> 9 6
> {code}
>  
> Log of Executor1:
> {code:java}
> (0, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (1, value: SparseVector(10)())
> (2, value: SparseVector(10)())
> (3, value: SparseVector(10)((3,1.0)))
> (8, value: SparseVector(10)())
> (0, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (1, value: SparseVector(10)())
> #On the vertex view, the value of node-2 is updated in the second round.# 
> (2, value: SparseVector(10)((3,0.4)))
> #On the vertex view, node-3 has still no value in the second round.#
> (3, value: SparseVector(10)())
> (8, value: SparseVector(10)())
> (0, pushing value: SparseVector(10)())
> #On the edge view, node-2 has no value in the third round.#
> (2, pushing value: SparseVector(10)())
> (2, pushing value: SparseVector(10)())
> (1, value: SparseVector(10)())
> (2, value: SparseVector(10)((3,0.4)))
> (3, value: SparseVector(10)())
> (8, value: SparseVector(10)())
> {code}
> Log of Executor2:
> {code:java}
> (3, pushing value: SparseVector(10)((3,1.0)))
> (3, pushing value: SparseVector(10)((3,1.0)))
> (7, pushing value: SparseVector(10)())
> (7, pushing value: SparseVector(10)())
> (9, pushing value: SparseVector(10)())
> (6, value: SparseVector(10)())
> #On the edge view, the value of node-3 does not change in the second round.#
> (3, pushing value: SparseVector(10)((3,1.0)))
> (3, pushing value: SparseVector(10)((3,1.0)))
> (7, pushing value: SparseVector(10)())
> (7, pushing value: SparseVector(10)())
> (9, pushing value: SparseVector(10)())
> (6, value: SparseVector(10)((3,0.4)))
> #The same error also happends in the third round.#
> (3, pushing value: SparseVector(10)((3,1.0)))
> (3, pushing value: SparseVector(10)((3,1.0)))
> (7, pushing value: SparseVector(10)())
> (7, pushing value: SparseVector(10)())
> (9, pushing value: SparseVector(10)())
> (6, value: SparseVector(10)((3,0.4)))
> {code}
> The log of executor3 and executor4 are omitted because they are similar to 
> above.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to