[
https://issues.apache.org/jira/browse/GIRAPH-322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eli Reisman updated GIRAPH-322:
-------------------------------
Attachment: GIRAPH-322-3.patch
Thanks Maja, I see now the reflection constructor is probably the issue, good
catch! I agree about the message cache and partition issues, I think there's a
big benefit to sending messages "one per worker" and then figuring out which
partition for sub-delivery. I was reluctant to change that in this patch since
I am new to the messaging code, and since the HashPartitioner provides the CLI
option I figured I should try it out that way and explore other solutions in
another JIRA or after Avery says it OK to hack the existing solution ;)
This is also why the message/destination data gets split up after being grouped
as you mentioned. It always has to be grouped by partition (and message dupes
sent individually per partition) because all messages on both sides are
organized around partitions. This is good if they get swapped in the middle of
a superstep, or if a worker died and we had a restart story, but for this use
case it does add some extra work and memory for no gain. As long as the
partitioning is set to 1 per worker we end up doing some extra work but not
sending extra messages, which is good enough for now.
We should bounce around some ideas about this though, its not an ideal
situation.
> Run Length Encoding for Vertex#sendMessageToAllEdges might curb out of
> control message growth in large scale jobs
> -----------------------------------------------------------------------------------------------------------------
>
> Key: GIRAPH-322
> URL: https://issues.apache.org/jira/browse/GIRAPH-322
> Project: Giraph
> Issue Type: Improvement
> Components: bsp
> Affects Versions: 0.2.0
> Reporter: Eli Reisman
> Assignee: Eli Reisman
> Priority: Minor
> Fix For: 0.2.0
>
> Attachments: GIRAPH-322-1.patch, GIRAPH-322-2.patch,
> GIRAPH-322-3.patch
>
>
> Vertex#sendMessageToAllEdges is a case that goes against the grain of the
> data structures and code paths used to transport messages through a Giraph
> application and out on the network. While messages to a single vertex can be
> combined (and should be) in some applications that could make use of this
> broadcast messaging, the out of control message growth of algorithms like
> triangle closing means we need to de-duplicate messages bound for many
> vertices/partitions.
> This will be an evolving solution (this first patch is just the first step)
> and currently it does not present a robust solution for disk-spill message
> stores. I figure I can get some advice about that or it can be a follow-up
> JIRA if this turns out to be a fruitful pursuit. This first patch is also
> Netty-only and simply defaults to the old sendMessagesToAllEdges()
> implementation if USE_NETTY is false. All this can be cleaned up when we know
> this works and/or is worth pursuing.
> The idea is to send as few broadcast messages as possible by run-length
> encoding their delivery and only duplicating message on the network when they
> are bound for different partitions. This is also best when combined with
> "-Dhash.userPartitionCount=# of workers" so you don't do too much of that.
> If this shows promise I will report back and keep working on this. As it is,
> it represents an end-to-end solution, using Netty, for in-memory messaging.
> It won't break with spill to disk, but you do lose the de-duplicating effect.
> More to follow, comments/ideas welcome. I expect this to change a lot as I
> test it and ideas/suggestions crop up.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira