-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6013/
-----------------------------------------------------------

(Updated Aug. 1, 2012, 7:25 p.m.)


Review request for giraph.


Changes
-------

Fixed partition exchange part.


Description
-------

This patch introduces out-of-core messages support for Giraph. Some ideas are 
taken from discussion in https://issues.apache.org/jira/browse/GIRAPH-45. 

We keep messages in MessageStore, from which they will be flushed to disk when 
necessary. Messages are separated by partition. In moments we only flush single 
partition to disk so we would still keep things in memory in case it's time for 
next superstep. When flushing to disk, we write in the following format:
numberOfVertices
vertexId1 numberOfMessages1 messagesForVertexId1
vertexId2 numberOfMessages2 messagesForVertexId2
...
Vertex ids are sorted. We don't require enough memory to fit all the messages 
for partition, but we require that messages for a single vertex fit in memory. 
In the end we potentially have several files for each partition. When reading 
messages, all the files are read sequentially.

DiskBackedMessageStoreByPartition handles all messages, DiskBackedMessageStore 
is then used for a single partition, and SequentialFileMessageStore handles 
single file.
There is also SimpleMessageStore which doesn't use disk at all.

Options available to user:
- whether or not to use out-of-core messaging
- number of messages to keep in memory - this should probably be changed 
(explained below)
- size of buffer when reading from and writing to disk

ServerData now has two instances of message stores: one which is consumed in 
current superstep with messages from previous superstep, and one in which it 
will keep incoming messages for next superstep.

Other things which had to be changed:
- Checkpointing - since messages are not kept in the vertex anymore, they need 
to be stored separately.
- Partition exchange between workers - same reasons as above - added 
SendMessagesRequest
- Messages are not assigned to vertex, they are just passed in compute
- compute methods are now executed in order of vertex id inside of partition, 
so we could have fast reading from disk

For memory check I only have the number of messages which I allow in memory. 
This should be done better, but there is a problem since Alessandro's patch for 
out-of-core graph also has memory checks. We don't want one of those parts to 
use all the memory and leave too little space for the other, but I'm not aware 
of a way to separately check memory usage of different data structures.

I didn't integrate this with RPC, that's why there are some checks for 
useNetty, those can be removed once the RPC is removed. Also, since vertex 
doesn't keep messages in itself anymore, once RPC is removed we should also 
remove getMessages/putMessages/getNumMessages from vertex, change initialize to 
(id, value, edges, hasMessages) and just give messages to vertex when calling 
compute.

I'll fix the part when partitions are sent around before superstep, since 
that's the only part now which requires that all the messages for single 
partition fit in memory.


This addresses bug GIRAPH-45.
    https://issues.apache.org/jira/browse/GIRAPH-45


Diffs (updated)
-----

  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendPartitionCurrentMessagesRequest.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java
 1366303 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java
 1366303 

Diff: https://reviews.apache.org/r/6013/diff/


Testing
-------

Run mvn verify and tests in pseudo-distributed mode, all apart from this one 
https://issues.apache.org/jira/browse/GIRAPH-259 pass.


Thanks,

Maja Kabiljo

Reply via email to