Nice discussion.
I'm totally with Maja on this one. The problem with the
INPUT_SUPERSTEP is connected with the other JIRA where the graph is
spilled to disk, and I personally don't see how messages should affect
it. In fact I believe the problem with the INPUT_SUPERSTEP happens
with big graphs, mostly when keeping the graph in memory (nearly)
fills the memory available.
Also, the impact of out-of-core messages really depends on the
algorithms and its semantics, wrt to message creation.
I had some algorithms that passed the INPUT_SUPERSTEP smoothly, but
failed in the upcoming supersteps due to the messages (e.g. with a
very dense graph).
That being said, I agree tests should be run systematically. When I
designed the out-of-core messages strategy presented in the JIRA and
implemented by Maja, it was meant (and still is) to be part of a
paper. For this reason I'm pretty happy to start discussing a
testing/benchmarking strategy.
1) we should run tests on two algorithms, e.g. pagerank and SSSP. They
are the algorithms used in the original Pregel paper and they are two
very different algorithms as far as messaging patterns are concerned.
2) we should run tests with a fixed size graph, with varying threshold
(from 0% to 100% out-of-core messages). We know the amount of messages
being sent per superstep by each worker, that should give us the
baseline.
This should show the cost of IO. I expect the cost, in time, to be
mostly depending on the write and read performance of disks, as we're
reading and writing sequentially, plus the cost of sorting. We should
show that.
3) The number of workers should not affect the cost of out-of-core
messages. The only intuitive advantage is that by fixing the graph
size and by increasing the number of workers, less vertices are
assigned to each worker and therefore less messages are spillt to disk
per worker. This is pretty intuitive and shouldn't really require
intensive testing.
We could use the graph generator which has been contributed recently
as an input graph.
What do you guys think?
On Fri, Jul 20, 2012 at 3:44 PM, Maja Kabiljo <[email protected]> wrote:
On July 19, 2012, 5:02 p.m., Eli Reisman wrote:
Hi Maja,
This was a lot of really hard work, great job. My general discomfort
with adopting this too quickly is this is a big change that adds a lot of
new moving parts, and needs to be extensively tested in two ways:
1. On real clusters, with varying data loads. Testing in pseudo mode
for a change this big doesn't tell us if it helps or hurts us. This does
involve (potentially) a lot of IO which adds overhead.
2. Tested on algorithms that mutate the graph during compute() super
steps so that we can objectively measure whats going on when that case
comes up.
My main point: I would be a lot more comfortable with this and the
other patch spilling to disk for partitions (also really great code) if
anyone writing these was doing some metrics and was addressing the fact
that we are not having a memory problem at very acceptable levels of
scale-out during any time but INPUT_SUPERSTEP. If we're not focused on
that, we are fixing a lot of stuff that has not proven to be broken yet.
The metrics all clearly show during real cluster jobs on a wide variety of
data load sizes that the critical moment is during super step -1 when the
data is loaded and collections of vertices are sent out over Netty to their
remote homes (partitions) in preparation for the calculation super steps.
A broad fix to this would include placing the mappers/workers on
cluster nodes that a replica of the data they read is stored at (as in
Hadoop, restoring locality) or to do the spills to disk during this phase
only, when its easy since no processing is going on and they can easily be
re-loaded when the splits are done and the memory pressure has receded. For
the rest of the processing cycle, they should be fine. As we scale out
further under the same memory constraints, we could add more creative
spilling techniques if needed and once the INPUT_SUPERSTEP stuff was proven
stable. Don't mean to rain on the parade but this really seems like a
sensible way to go forward?
Regardless, everyone working on disk spill code as done really fine
work, I admire it. If we adapt it to rein in the scope a bit, or back these
ideas with some realistic testing/metrics, I'll be your biggest supporter!
Hi Eli,
Thanks a lot for looking into this.
I totally agree that I need to do more testing and measurements for this
solution, that was my plan from the beginning. I uploaded the patch now
since it's big and I wanted to parallelise doing benchmarking and getting
some comments about it. Sorry if that's the wrong way of doing it.
I think whether or not messages are going to cause memory problem
strongly depends on the algorithm which is run. The way I see it, there are
cases in which you won't have problems with graph (not even during input
superstep) but you'll have them with messages. I do understand what you are
saying, Alessandro is already trying to address input superstep problem,
but I think this thing is also good to have.
Note that out-of-core messaging is implemented as an option, i.e. users
can still choose not to use it, and in that case the way the whole system
works haven't changed much (in terms of performance).
As for 2., graph mutations don't have big influence on messaging, apart
from creation of nonexistent vertices which received messages. Am I missing
something?
I'll be working on providing some metrics and will update when I do so.
- Maja
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6013/#review9285
-----------------------------------------------------------
On July 19, 2012, 12:09 p.m., Maja Kabiljo wrote:
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6013/
-----------------------------------------------------------
(Updated July 19, 2012, 12:09 p.m.)
Review request for giraph.
Description
-------
This patch introduces out-of-core messages support for Giraph. Some
ideas are taken from discussion in
https://issues.apache.org/jira/browse/GIRAPH-45.
We keep messages in MessageStore, from which they will be flushed to
disk when necessary. Messages are separated by partition. In moments we
only flush single partition to disk so we would still keep things in memory
in case it's time for next superstep. When flushing to disk, we write in
the following format:
numberOfVertices
vertexId1 numberOfMessages1 messagesForVertexId1
vertexId2 numberOfMessages2 messagesForVertexId2
...
Vertex ids are sorted. We don't require enough memory to fit all the
messages for partition, but we require that messages for a single vertex
fit in memory.
In the end we potentially have several files for each partition. When
reading messages, all the files are read sequentially.
DiskBackedMessageStoreByPartition handles all messages,
DiskBackedMessageStore is then used for a single partition, and
SequentialFileMessageStore handles single file.
There is also SimpleMessageStore which doesn't use disk at all.
Options available to user:
- whether or not to use out-of-core messaging
- number of messages to keep in memory - this should probably be
changed (explained below)
- size of buffer when reading from and writing to disk
ServerData now has two instances of message stores: one which is
consumed in current superstep with messages from previous superstep, and
one in which it will keep incoming messages for next superstep.
Other things which had to be changed:
- Checkpointing - since messages are not kept in the vertex anymore,
they need to be stored separately.
- Partition exchange between workers - same reasons as above - added
SendMessagesRequest
- Messages are not assigned to vertex, they are just passed in compute
- compute methods are now executed in order of vertex id inside of
partition, so we could have fast reading from disk
For memory check I only have the number of messages which I allow in
memory. This should be done better, but there is a problem since
Alessandro's patch for out-of-core graph also has memory checks. We don't
want one of those parts to use all the memory and leave too little space
for the other, but I'm not aware of a way to separately check memory usage
of different data structures.
I didn't integrate this with RPC, that's why there are some checks for
useNetty, those can be removed once the RPC is removed. Also, since vertex
doesn't keep messages in itself anymore, once RPC is removed we should also
remove getMessages/putMessages/getNumMessages from vertex, change
initialize to (id, value, edges, hasMessages) and just give messages to
vertex when calling compute.
I'll fix the part when partitions are sent around before superstep,
since that's the only part now which requires that all the messages for
single partition fit in memory.
This addresses bug GIRAPH-45.
https://issues.apache.org/jira/browse/GIRAPH-45
Diffs
-----
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyServer.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClient.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerClientServer.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/NettyWorkerServer.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RPCCommunications.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/RequestRegistry.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/SendPartitionMessagesRequest.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/ServerData.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/WorkerServer.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/BasicMessageStore.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStore.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/DiskBackedMessageStoreByPartition.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/FlushableMessageStore.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStore.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreByPartition.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SendMessagesRequest.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SequentialFileMessageStore.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/comm/messages/package-info.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/graph/partition/Partition.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/main/java/org/apache/giraph/utils/CollectionUtils.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/ConnectionTest.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/RequestTest.java1363291
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/comm/TestMessageStores.javaPRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/src/test/java/org/apache/giraph/utils/MockUtils.java1363291
Diff: https://reviews.apache.org/r/6013/diff/
Testing
-------
Run mvn verify and tests in pseudo-distributed mode, all apart from
this one https://issues.apache.org/jira/browse/GIRAPH-259 pass.
Thanks,
Maja Kabiljo
--
Claudio Martella
[email protected]