[ 
https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13453196#comment-13453196
 ] 

Eli Reisman commented on GIRAPH-322:
------------------------------------

Thanks Maja, great advice. I will check into all these things. I know exactly 
what you mean about the spill to disk, it will require some thought to 
integrate. That has been the problem with this whole approach: Giraph is 
hard-wired to Partition -> Vertex -> List<M> and broadcast-friendly data 
structures really go against the grain.

The one issue that also makes spills to disk a tricky option is that once 
spilled to disk each M object is written over and over (not a referenceId or 
something), and when read back they are "fresh instances" again not ref's to 
the same object. So we get a bunch of re-duplication on read from disk. 
Eliminating this problem will require some thought and overhauls.

Our data already fits in memory well, its just a matter of getting the 
messaging out of a N^2 growth situation and into a (N * a constant) area, where 
the amortizing can do the rest of the job. The nice thing about the amortizing 
(its an ugly solution I know) is that when only some workers are sending on any 
given superstep, the steps pass extremely quickly. once something works well, I 
will attempt to generalize the solution. Forcing the amortizing responsibility 
out on the application users is not ideal, even if SimpleTriangleClosingVertex 
provides an example to copy.

Thanks for the tip about the constructor, the one from the original class (if I 
remember right) was just empty braces so I left it to default. Nice save!

So you're saying if I set the command line opts correctly, the Vertex#compute() 
cycle will actually grind to a halt on each worker while the message system 
copies to/from disk whenever messages pile up? if so, this is worth playing 
with for sure, i did not see that behavior when I was trying to run the disk 
spill options, amybe I just set it up wrong? without the compute cycle stopping 
during spill reads/writes, the messages just pile up in memory instead of on 
the network. Either way the re-duplication on disk read is going to keep that 
from being a one-stop solution for us.

As far as 1 partition per worker, yes its funny I considered an alternate JIRA 
when i was coding this to address the SendMessageCache issue you mentioned. 
Since the reason for lots of small partitions per worker was to evenly 
redistribute them on worker crash, it hasn't really hurt us to use 1 per worker 
for this purpose since it drastically reduces messages required per worker, and 
there is no re-distribution of partitions on worker crash, crash just ends the 
job run for Giraph in its current form. Anyway a fix for both message caches in 
this regard is a great idea, I will take a 2nd look at this.

As for how I think the folks on this end want the final solution to work, we're 
working toward "spill to disk only when there's no other choice" so if it is a 
part of the solution it should ideally be there just as an emergency buffer 
against overload that kicks in once in a while. But just getting it to work at 
the scale we want is my main goal, we can tune the use case after that!

OK, off to try this stuff...thanks so much for your thoughtful input!
                
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of 
> control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: GIRAPH-322
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-322
>             Project: Giraph
>          Issue Type: Improvement
>          Components: bsp
>    Affects Versions: 0.2.0
>            Reporter: Eli Reisman
>            Assignee: Eli Reisman
>            Priority: Minor
>             Fix For: 0.2.0
>
>         Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the 
> data structures and code paths used to transport messages through a Giraph 
> application and out on the network. While messages to a single vertex can be 
> combined (and should be) in some applications that could make use of this 
> broadcast messaging, the out of control message growth of algorithms like 
> triangle closing means we need to de-duplicate messages bound for many 
> vertices/partitions.
> This will be an evolving solution (this first patch is just the first step) 
> and currently it does not present a robust solution for disk-spill message 
> stores. I figure I can get some advice about that or it can be a follow-up 
> JIRA if this turns out to be a fruitful pursuit. This first patch is also 
> Netty-only and simply defaults to the old sendMessagesToAllEdges() 
> implementation if USE_NETTY is false. All this can be cleaned up when we know 
> this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length 
> encoding their delivery and only duplicating message on the network when they 
> are bound for different partitions. This is also best when combined with 
> "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is, 
> it represents an end-to-end solution, using Netty, for in-memory messaging. 
> It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I 
> test it and ideas/suggestions crop up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to