[
https://issues.apache.org/jira/browse/SPARK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Apache Spark reassigned SPARK-5036:
-----------------------------------
Assignee: Apache Spark
> Better support sending partial messages in Pregel API
> -----------------------------------------------------
>
> Key: SPARK-5036
> URL: https://issues.apache.org/jira/browse/SPARK-5036
> Project: Spark
> Issue Type: Improvement
> Components: GraphX
> Reporter: shijinkui
> Assignee: Apache Spark
> Attachments: s1.jpeg, s2.jpeg
>
>
> Better support sending partial messages in Pregel API
> 1. the reqirement
> In many iterative graph algorithms, only a part of the vertexes (we call them
> ActiveVertexes) need to send messages to their neighbours in each iteration.
> In many cases, ActiveVertexes are the vertexes that their attributes do not
> change between the previous and current iteration. To implement this
> requirement, we can use Pregel API + a flag (e.g., `bool isAttrChanged`) in
> each vertex's attribute.
> However, after `aggregateMessage` or `mapReduceTriplets` of each iteration,
> we need to reset this flag to the init value in every vertex, which needs a
> heavy `joinVertices`.
> We find a more efficient way to meet this requirement and want to discuss it
> here.
> Look at a simple example as follows:
> In i-th iteartion, the previous attribute of each vertex is `Attr` and the
> newly computed attribute is `NewAttr`:
> |VID| Attr| NewAttr| Neighbours|
> |:----|:-----|:----|:------|
> | 1 | 4| 5| 2, 3 |
> | 2 | 3| 2| 1, 4 |
> | 3 | 2| 2| 1, 4 |
> | 4| 3| 4| 1, 2, 3 |
> Our requirement is that:
> 1. Set each vertex's `Attr` to be `NewAttr` in i-th iteration
> 2. For each vertex whose `Attr!=NewAttr`, send message to its neighbours
> in the next iteration's `aggregateMessage`.
> We found it is hard to implement this requirment using current Pregel API
> efficiently. The reason is that we not only need to perform `pregel()` to
> compute the `NewAttr` (2) but also need to perform `outJoin()` to satisfy
> (1).
> A simple idea is to keep a `isAttrChanged:Boolean` (solution 1) or
> `flag:Int` (solution 2) in each vertex's attribute.
> 2. two solution
> -----------
> 2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr
> 
> 1. init message by `aggregateMessage`
> it return a messageRDD
> 2. `innerJoin`
> compute the messages on the received vertex, return a new VertexRDD
> which have the computed value by customed logic function `vprog`, set
> `isAttrChanged = true`
> 3. `outerJoinVertices`
> update the changed vertex to the whole graph. now the graph is new.
> 4. `aggregateMessage`. it return a messageRDD
> 5. `joinVertices` reset erery `isAttrChanged` of Vertex attr to false
> ```
> // here reset the isAttrChanged to false
> g = updateG.joinVertices(updateG.vertices) {
> (vid, oriVertex, updateGVertex) => updateGVertex.reset()
> }
> ```
> here need to reset the vertex attribute object's variable as false
> if don't reset the `isAttrChanged`, it will send message next iteration
> directly.
> **result:**
> * Edge: 890041895
> * Vertex: 181640208
> * Iterate: 150 times
> * Cost total: 8.4h
> * can't run until the 0 message
> solution 2. color vertex
> 
> iterate process:
> 1. innerJoin
> `vprog` using as a partial function, looks like `vprog(curIter, _:
> VertexId, _: VD, _: A)`
> ` i = i + 1; val curIter = i`.
> in `vprog`, user can fetch `curIter` and assign to `falg`.
> 2. outerJoinVertices
> `graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) =>
> newOpt.getOrElse(old)}.cache()`
> 3. aggregateMessages
> sendMsg is partial function, looks like `sendMsg(curIter, _:
> EdgeContext[VD, ED, A]`
> **in `sendMsg`, compare `curIter` with `flag`, determine whether
> sending message**
> #### result
> raw data from
> * vertex: 181640208
> * edge: 890041895
> | | iteration average cost | 150 iteration cost | 420 iteration cost |
> | ------------ | ------------- | ------------ | ------------ |
> | solution 1 | 188m | 7.8h | cannot finish |
> | solution 2 | 24 | 1.2h | 3.1h |
> | compare | 7x | 6.5x | finished in 3.1 |
>
> ## the end
>
> i think the second solution(Pregel + a flag) is better.
> this can really support the iterative graph algorithms which only part of the
> vertexes send messages to their neighbours in each iteration.
> we shall use it in product environment.
> pr: https://github.com/apache/spark/pull/3866
> ----EOF----
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]