Claudio Martella commented on GIRAPH-45:
Yes, we don't, I agree. As a matter of fact during the last two days I've been
working on an implementation of the previous model with own code. It's going on
quite smoothly: my prototype, not integrated with giraph, should be ready at
the end of today. If you're so much of hurry I can upload it as an attachment
ASAP, when it's complete (again as an external class, not as a giraph patch)
and you can help me out (much appreciation here) with the integration (as a
giraph patch this time =) ).
The prototype implements a Memstore + SequenceFile approach to store the (I
vertex, M message) pairs *sorted* on disk. As we don't work on durability, on
deletes and on updates, this turned out being quite simple and quite
disk-efficient. We have the typical memstore where we store the data, when we
reach a given threshold (maxSize or some %freeHeap) we flush (sorted by
vertex)it to disk and we keep a small BTree for that SequenceFile in memory. As
we don't have updates or deletes, we don't really need to do merge as in
BigTable, as we don't do random reads, we don't need compaction, as we don't do
durability we don't need log-files. Before proceding with the compute() of a
given vertex we ask all the SequenceFiles to seek to the first message for the
given Vertex,if any, and we read sequentially among memstore and files. If we
don't send many messages, we basically never touch the disk but just the
memstore and we go back to the just-in-memory approach which is in giraph right
now, without any actual particular code to handle the two different cases.
Also, when we read we have a stream with ALL the messages to the vertex, so it
would make it quite combiner-efficient.
Basically, if senders were also partially sorting messages prior to sending,
what we would have is something very similar to the shuffle&sort phase of MR.
Which brings me to some considerations about your points.
For how I see it, what makes MR not so efficient for iterative graph processing
is, a part of job boot-time, is that it not only forwards messages but also the
graph-structure/topology between map and reduce phases and between jobs. On our
side, we never hit the disk for messages and we keep the graph structure
stateful in memory, so we have a double win (which makes it quite difficult to
sell a comparison between MR and Pregel). If we start hitting the disk (and
heavily the network, as you're proposing HDFS: bear in mind that A might write
to DFS node B data that might actually be directed to C => double network
access, a bit like if mappers would write to hdfs in mapreduce) to save the
graph and the messages, we better just use MapReduce at first place. Of course
I see you're saving the multiple job-boottime and you're actually not writing
EVERYTHING, but only what doesn't fit in memory, but I have a possibly simpler
idea in mind that goes towards what you have in mind. If we start with the
assumption that we can keep at least the graph in memory, we can basically
never write it to disk and just write the messages to disk when necessary (in
our case more messages would hit the disk than in your scenario, because we'd
have more pressure due to the fact we keep the whole graph in memory). But
because we have a quite memory efficient way of of storing the messages to
disk, in a sorted way, we can take advantage of it when we run the the
superstep. Suppose we have the vertices in each partition sorted by vertexID,
just the same way their messages are sorted on disk. Iterating over the
vertices belonging to that partition and scanning the messages directed to
vertices for that partition would have a linear 1-1 mapping, basically meaning
we'd never have to seek in the on-disk messagebox. What this would bring to is
something similar to mapreduce, for how the values/messages are handled, but
still pregel-ish for how the structure is kept in memory.
Does it make sense? Do you want me to share the complete prototyp-ish library
as an attachment on here this evening?
> Improve the way to keep outgoing messages
> Key: GIRAPH-45
> URL: https://issues.apache.org/jira/browse/GIRAPH-45
> Project: Giraph
> Issue Type: Improvement
> Components: bsp
> Reporter: Hyunsik Choi
> Assignee: Hyunsik Choi
> As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a
> potential problem to cause out of memory when the rate of message generation
> is higher than the rate of message flush (or network bandwidth).
> To overcome this problem, we need more eager strategy for message flushing or
> some approach to spill messages into disk.
> The below link is Dmitriy's suggestion.
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
For more information on JIRA, see: http://www.atlassian.com/software/jira