Claudio Martella commented on GIRAPH-45:

Yes, we don't, I agree. As a matter of fact during the last two days I've been 
working on an implementation of the previous model with own code. It's going on 
quite smoothly: my prototype, not integrated with giraph, should be ready at 
the end of today. If you're so much of hurry I can upload it as an attachment 
ASAP, when it's complete (again as an external class, not as a giraph patch) 
and you can help me out (much appreciation here) with the integration (as a 
giraph patch this time =) ).

The prototype implements a Memstore + SequenceFile approach to store the (I 
vertex, M message) pairs *sorted* on disk. As we don't work on durability, on 
deletes and on updates, this turned out being quite simple and quite 
disk-efficient. We have the typical memstore where we store the data, when we 
reach a given threshold (maxSize or some %freeHeap) we flush (sorted by 
vertex)it to disk and we keep a small BTree for that SequenceFile in memory. As 
we don't have updates or deletes, we don't really need to do merge as in 
BigTable, as we don't do random reads, we don't need compaction, as we don't do 
durability we don't need log-files. Before proceding with the compute() of a 
given vertex we ask all the SequenceFiles to seek to the first message for the 
given Vertex,if any, and we read sequentially among memstore and files. If we 
don't send many messages, we basically never touch the disk but just the 
memstore and we go back to the just-in-memory approach which is in giraph right 
now, without any actual particular code to handle the two different cases. 
Also, when we read we have a stream with ALL the messages to the vertex, so it 
would make it quite combiner-efficient.

Basically, if senders were also partially sorting messages prior to sending, 
what we would have is something very similar to the shuffle&sort phase of MR. 
Which brings me to some considerations about your points.

For how I see it, what makes MR not so efficient for iterative graph processing 
is, a part of job boot-time, is that it not only forwards messages but also the 
graph-structure/topology between map and reduce phases and between jobs. On our 
side, we never hit the disk for messages and we keep the graph structure 
stateful in memory, so we have a double win (which makes it quite difficult to 
sell a comparison between MR and Pregel). If we start hitting the disk (and 
heavily the network, as you're proposing HDFS: bear in mind that A might write 
to DFS node B data that might actually be directed to C => double network 
access, a bit like if mappers would write to hdfs in mapreduce) to save the 
graph and the messages, we better just use MapReduce at first place. Of course 
I see you're saving the multiple job-boottime and you're actually not writing 
EVERYTHING, but only what doesn't fit in memory, but I have a possibly simpler 
idea in mind that goes towards what you have in mind. If we start with the 
assumption that we can keep at least the graph in memory, we can basically 
never write it to disk and just write the messages to disk when necessary (in 
our case more messages would hit the disk than in your scenario, because we'd 
have more pressure due to the fact we keep the  whole graph in memory). But 
because we have a quite memory efficient way of of storing the messages to 
disk, in a sorted way, we can take advantage of it when we run the the 
superstep. Suppose we have the vertices in each partition sorted by vertexID, 
just the same way their messages are sorted on disk. Iterating over the 
vertices belonging to that partition and scanning the messages directed to 
vertices for that partition would have a linear 1-1 mapping, basically meaning 
we'd never have to seek in the on-disk messagebox. What this would bring to is 
something similar to mapreduce, for how the values/messages are handled, but 
still pregel-ish for how the structure is kept in memory.

Does it make sense? Do you want me to share the complete prototyp-ish library 
as an attachment on here this evening?
> Improve the way to keep outgoing messages
> -----------------------------------------
>                 Key: GIRAPH-45
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-45
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>            Reporter: Hyunsik Choi
>            Assignee: Hyunsik Choi
> As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a 
> potential problem to cause out of memory when the rate of message generation 
> is higher than the rate of message flush (or network bandwidth).
> To overcome this problem, we need more eager strategy for message flushing or 
> some approach to spill messages into disk.
> The below link is Dmitriy's suggestion.
> https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to