Hi Maja,
Thanks for publishing your results! Really nice performance
improvement. I have some questions/comments inline.
On 8/1/12 11:43 AM, Maja Kabiljo wrote:
I've been running some benchmarking of this solution, I put in Excel
document in the attachment. There are some results of PageRankBenchmark
and RandomMessagesBenchmark. Sheets 'Page Rank 3', 'Page Rank 4' and
'Messages 3' show the cases in which we run out of memory. Shortest Paths
algorithm uses messages very little when compared to the amount of other
data, so there I couldn't see any differences between solutions.
Interesting cases are 'Page Rank 2' and 'Messages 2' where I guess we are
very tight on memory so going out of core helps (I ran those a few times
since, but keep getting the same results).
Are you saying that out-of-core is faster that hitting memory boundaries
(i.e. GC)? It is a bit tough to imagine that out-of-core beats in-core =).
We can also see that execution
time is improved with just SimpleMessageStore, since in current
implementation we copy messages around when we store them in vertex.
So the performance difference can be explained by reducing memory copies?
I also tried running RandomMessagesBenchmark with really huge amount of
messages, but it crashed because message store didn't process messages
fast enough and worker got flooded with unprocessed requests. So in cases
like that the only thing which could help us would be to decrease the
speed of compute executions. But I think this is something that shouldn't
happen in real applications - this benchmark doesn't use received messages
at all, in a real application executions are going to be slower anyway if
they have to process that much data. Anyway, it would be good to have a
real problem which uses messages intensively and then we could see what's
really going on.
A question here: Could we have set the max messages to a lower value to
prevent the crashing? What error did you actually see in this case?
As a conclusion, to start with, maybe I can create a smaller patch from
this which only adds SimpleMessageStore, since as we can see keeping
messages outside of vertices helps. And then, once the RPC is removed, we
will be able to finally remove putMessages/getMessages/getNumMessages
functions from Vertex.
I think some folks are really going to like that. It can allow them to
directly implement MutableVertex (I think).
For the out-of-core part, if we still offer the
option not to use it as default, I see no harm of adding it also, and as
you can see there are benefits in some cases.
I don't see any harm here at all.
Another thing, I think I should explain what from GIRAPH-45 discussion am
I actually using here, since I don't use bloomfilters and BTrees. The way
it works is the following:
- Inside the outer message store we have message stores for each of the
partitions separately.
- Partition message stores keep data in ordered map (ordered by vertex id).
- In outer messages store we check if we should flush something (do we
have more than allowed number of messages in memory). While we do, we
flush the partition with largest number of messages in memory.
- When partition messages store is flushed, all the data is written to a
file in the order of vertex ids, file content is like:
num_vertices
vertex_1_id num_messages_1 message_1_1 message_1_2 ...
vertex_2_id num_messages_2 message_2_1 message_2_2 ...
...
- In the end each partition will have some messages in memory, and N
files, where N is the number of times it was flushed.
- When it's time to do the computation, within a single partition we call
compute methods in order of vertex ids.
- We use buffered streams and read data from all partition files
sequentially, since we'll need data in the same order it's written in each
of the files. This way we limit number of random file accesses.
Maja
On 7/24/12 1:45 AM, "Avery Ching" <[email protected]> wrote:
We should integrate the partitioning of the graph into the input
superstep to get locality as well. We can use MapReduce to try and
schedule the map tasks (workers) closest to its data and then make the
workers smart enough to only try to load their partitions.
On 7/22/12 4:30 PM, Claudio Martella wrote:
I see your reasoning. In general I'm being open to use MR when
necessary (e.g. i used to propose it instead of the automatic vertex
creation), here it could get tricky. I can see additional HDFS usage
as down (you have to be able to store 2x the graph). However, once the
graph is pre-filtered, this additional stage would not be necessary
again for the successive jobs (only when a different number of workers
is used). Though, it does add a not so small passage to the process.
On Sun, Jul 22, 2012 at 10:49 PM, Alessandro Presta <[email protected]>
wrote:
Exactly. On paper, the amount of data around should be the same as
during
the computation, but in practice we do use a lot more memory.
You can play with the settings and just push the problem a little
farther
away, by caching less and flushing requests more frequently, so then
the
bottleneck is on the servers.
We're basically sending (k-1)/k of the graph through the network,
where k
is the number of workers.
What I'm thinking is that in INPUT_SUPERSTEP we're doing what
MapReduce is
really good at (sorting and aggregating) in a probably inefficient (or
at
least non-scalable) way.
We could try implementing it with a MapReduce job instead, where the
mappers take input splits and emit (partition_id, vertex) (they would
have
access to the partitioner) and reducers just output the built
partitions
to HDFS.
The computation stage would then be the usual Giraph job, where each
worker knows where to get its partitions from HDFS.
I can try making this change and see how it goes. It would just be one
MR
job, so we're not selling our souls to iterative MR.
I can also see many cases where one might not want to shuffle vertices
around at all: each worker reads a roughly equal part of the input
(forget
about bigger vertices for now) and simply communicates its own vertex
ids
to the master. Partition "a posteriori" instead of "a priori".
What do you think?
On 7/20/12 9:42 PM, "Eli Reisman" <[email protected]> wrote:
What we are seeing in the metrics is the three-way load of
1. reading InputSplits from HDFS (mostly over the wire as there is no
locality right now)
2. creating temporary collections of vertices, sending them on netty
3. simultaneously receiving collections of vertices on netty from
remote
nodes that will be place in the local workers' partitions for
processing
stages