[ 
https://issues.apache.org/jira/browse/GIRAPH-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13169756#comment-13169756
 ] 

Avery Ching edited comment on GIRAPH-45 at 12/14/11 10:26 PM:
--------------------------------------------------------------

I've been thinking about this a bit more.  I don't think we actually need a 
database if we use disk friendly approach and take advantage of the knowledge 
of our system.  Here is a rough proposal:

There are two ways we can save memory here (out-of-core graph) and (out-of-core 
messages).  In this way, we can use the memory as a cache rather than a totally 
in-memory database and messaging system.

Here's how we can do the out-of-core graph:

Workers already do the computation by partition.  All partitions that are owned 
by the worker need to be processed and we want to minimize the amount of data 
loaded/stored to local disk (i.e. <superstep>.<worker id>.<partition 
#>.vertices).  Local disk should be used here because it will be faster and no 
remote worker needs to directly access this data.

Therefore the general algorithm would be

{noformat} 
for (partition : all in memory partitions)
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()
for (partition : remaining in file system partitions)
  partition.loadFromFileSystem()
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()
{noformat} 

This should keep our partition cache as full as possible and have a minimal 
amount of loading/storing for partitions that can't fit in memory.

Here's how we can do the out-of-core messaging:

As the partitions are being processed by the workers, outgoing messages as kept 
in memory currently.  They are flushed is a message list grows to a certain 
size.  Otherwise, the messages are bulk sent at the end of the computation.

What we can do is wait for a sendMessageReq and check for memory pressure.  If 
memory pressure is an issue, then dump all the outgoing messages to HDFS files 
(i.e. <superstep>.<worker id>.<partition #>.outgoingMessages).  Future 
sendMessageReq may be kept in memory or dumped to the same HDFS files if memory 
pressure is an issue.  These HDFS files are closed prior to the flush.  During 
the flush, the worker sends the in-memory messages as normal to the 
destinations as well as the filenames of the out-of-core messages to their 
respective owners.  Note that the files are stored in HDFS to allow a remote 
worker the ability to load the messages as they see fit.  Maybe reduce the 
replication factor to 2 by default for these files?

This tactic should reduce memory usage on the destination worker as well, since 
the destination workers don't need to load the HDFS files until they are 
actually doing the computation for that partition.

Checkpoints should be able to point to the out-of-core data as well to reduce 
the amount of data to store.

Still, there is one more remaining piece (loading the graph).  This can also 
run out of memory.  Currently vertex lists are batched and sent to destination 
workers by partition.  Partitions should have the ability to be incrementally 
dumped to local files on the destination if there is memory pressure.  Then 
prior to the 1st superstep, each partition can be assembled (local files + any 
vertices stil in memory) and can use the out-of-core graph algorithm indicated 
above.

This proposal should take advantage of large reads/writes so that we don't need 
a database.  I will require out-of-core storage in the very near future as the 
graph i need to load will have billions of edges and I probably won't have 
enough nodes and memory to keep it all in core.  Please let me know your 
thoughts on this approach.

                
      was (Author: aching):
    I've been thinking about this a bit more.  I don't think we actually need a 
database if we use disk friendly approach and take advantage of the knowledge 
of our system.  Here is a rough proposal:

There are two ways we can save memory here (out-of-core graph) and (out-of-core 
messages).  In this way, we can use the memory as a cache rather than a totally 
in-memory database and messaging system.

Here's how we can do the out-of-core graph:

Workers already do the computation by partition.  All partitions that are owned 
by the worker need to be processed and we want to minimize the amount of data 
loaded/stored to local disk (i.e. <superstep>.<worker id>.<partition 
#>.vertices).  Local disk should be used here because it will be faster and no 
remote worker needs to directly access this data.

Therefore the general algorithm would be

for (partition : all in memory partitions)
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()

for (partition : remaining in file system partitions)
  partition.loadFromFileSystem()
  partition.computeAndGenerateOutgoingMessages()
  if (memoryPressure)
     partition.storeToFileSystem()

This should keep our partition cache as full as possible and have a minimal 
amount of loading/storing for partitions that can't fit in memory.

Here's how we can do the out-of-core messaging:

As the partitions are being processed by the workers, outgoing messages as kept 
in memory currently.  They are flushed is a message list grows to a certain 
size.  Otherwise, the messages are bulk sent at the end of the computation.

What we can do is wait for a sendMessageReq and check for memory pressure.  If 
memory pressure is an issue, then dump all the outgoing messages to HDFS files 
(i.e. <superstep>.<worker id>.<partition #>.outgoingMessages).  Future 
sendMessageReq may be kept in memory or dumped to the same HDFS files if memory 
pressure is an issue.  These HDFS files are closed prior to the flush.  During 
the flush, the worker sends the in-memory messages as normal to the 
destinations as well as the filenames of the out-of-core messages to their 
respective owners.  Note that the files are stored in HDFS to allow a remote 
worker the ability to load the messages as they see fit.  Maybe reduce the 
replication factor to 2 by default for these files?

This tactic should reduce memory usage on the destination worker as well, since 
the destination workers don't need to load the HDFS files until they are 
actually doing the computation for that partition.

Checkpoints should be able to point to the out-of-core data as well to reduce 
the amount of data to store.

Still, there is one more remaining piece (loading the graph).  This can also 
run out of memory.  Currently vertex lists are batched and sent to destination 
workers by partition.  Partitions should have the ability to be incrementally 
dumped to local files on the destination if there is memory pressure.  Then 
prior to the 1st superstep, each partition can be assembled (local files + any 
vertices stil in memory) and can use the out-of-core graph algorithm indicated 
above.

This proposal should take advantage of large reads/writes so that we don't need 
a database.  I will require out-of-core storage in the very near future as the 
graph i need to load will have billions of edges and I probably won't have 
enough nodes and memory to keep it all in core.  Please let me know your 
thoughts on this approach.

                  
> Improve the way to keep outgoing messages
> -----------------------------------------
>
>                 Key: GIRAPH-45
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-45
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>            Reporter: Hyunsik Choi
>            Assignee: Hyunsik Choi
>
> As discussed in GIRAPH-12(http://goo.gl/CE32U), I think that there is a 
> potential problem to cause out of memory when the rate of message generation 
> is higher than the rate of message flush (or network bandwidth).
> To overcome this problem, we need more eager strategy for message flushing or 
> some approach to spill messages into disk.
> The below link is Dmitriy's suggestion.
> https://issues.apache.org/jira/browse/GIRAPH-12?focusedCommentId=13116253&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13116253

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to