[ https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13171047#comment-13171047 ]
Claudio Martella commented on GIRAPH-45: ---------------------------------------- I must explained wrongly the idea. The output format is basically: (key0, value2) (key0, value1) (key0, value4) ... (key1, value4) (key1, value2) (key1, value7) ... this is to say that the output file is sorted by key, but messages are not sorted, they end up in the file in the order they arrived. The index is a tree with a sample set of vertices (i.e. #totalVertices % N) with their position inside of that file. So if you want to look for the messages sent to a particular vertex, i.e. at the beginning of that vertex superstep, you'd look into the tree for the floorEntry(key) and scan that chunk until you see the beginning of the block where your messages are stored (and this for each file were we flushed). To minimize disk i/o, i'm using a bloomfilter so that i don't look into files were i reasonably know there aren't going to be messages for my vertex. At this point you can just iterate over the messages one after the other and feed them to the messages iterator fed to the compute() method. So the index doesn't have one entry per vertex, but only for a subset of the vertices (and no entry at all for the messages). An easier way would be to go as you suggest, to store all the messages to different files, flush them unsorted as they reach as certain threshold, and then load them up entirely at the beginning of a partition's superstep. This would allow us to avoid sort stuff but we'd need to be able to keep all the messages to all the vertices belonging to one partition in memory all the time. Maybe it's suitable, I was going for the approach that would require less in-memory contention. Another optimization I was going to look into is to change the format to (key0, #bytes, [message2, message1, message4]) (key1, #bytes, [message4, message2, message7]) this would allow us, when scanning the file looking for our block, to avoid deserializing all the messages as well, but just skip to the next key. It requires to serialize the message list to a byte[], so that i can write #bytes, before writing the messages to disk. I'll implement this after the naive version is ready. Does it make sense like this to you? > Improve the way to keep outgoing messages > ----------------------------------------- > > Key: GIRAPH-45 > URL: https://issues.apache.org/jira/browse/GIRAPH-45 > Project: Giraph > Issue Type: Improvement > Components: bsp > Reporter: Hyunsik Choi > Assignee: Hyunsik Choi > > As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a > potential problem to cause out of memory when the rate of message generation > is higher than the rate of message flush (or network bandwidth). > To overcome this problem, we need more eager strategy for message flushing or > some approach to spill messages into disk. > The below link is Dmitriy's suggestion. > https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira