Inconsistent message map handling in 
BasicRPCCommunications.LargeMessageFlushExecutor
-------------------------------------------------------------------------------------

                 Key: GIRAPH-114
                 URL: https://issues.apache.org/jira/browse/GIRAPH-114
             Project: Giraph
          Issue Type: Bug
    Affects Versions: 0.70.0
            Reporter: Sebastian Schelter
            Priority: Critical
         Attachments: GIRAPH-114.patch

I'm currently implementing a simple algorithm to identify all the connected 
components of a graph. The algorithm ran well in a local IDE unit tests on toy 
data and in a local single node hadoop instance using a graph of ~100k edges.

When I tested it on a real cluster with the wikipedia pagelink graph (5.7M 
vertices, 130M edges), I ran into strange exceptions like this:

{noformat} 
2011-12-21 12:03:57,015 INFO org.apache.hadoop.mapred.TaskInProgress: Error 
from attempt_201112131541_0034_m_000027_0: java.lang.IllegalStateException: 
run: Caught an unrecoverable exception flush: Got ExecutionException
        at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:641)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
        at org.apache.hadoop.mapred.Child.main(Child.java:253)
Caused by: java.lang.IllegalStateException: flush: Got ExecutionException
        at 
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:946)
        at 
org.apache.giraph.graph.BspServiceWorker.finishSuperstep(BspServiceWorker.java:916)
        at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:588)
        at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:632)
        ... 7 more
Caused by: java.util.concurrent.ExecutionException: 
java.lang.IllegalStateException: run: Impossible for no messages in 1603276
        at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
        at java.util.concurrent.FutureTask.get(FutureTask.java:83)
        at 
org.apache.giraph.comm.BasicRPCCommunications.flush(BasicRPCCommunications.java:941)
        ... 10 more
Caused by: java.lang.IllegalStateException: run: Impossible for no messages in 
1603276
        at 
org.apache.giraph.comm.BasicRPCCommunications$PeerFlushExecutor.run(BasicRPCCommunications.java:245)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)
{noformat} 

The exception is thrown because a vertex with no message to send to is found in 
the datastructure holding the outgoing messages.

I tracked this behavior down:


In *BasicRPCCommunications:541-546* the map holding the outgoing messages for 
vertices of a particular machine is created. It's stored in two places 
_BasicRPCCommunications.outMessages_ and as member variable 
_outMessagesPerPeer_ of its _PeerConnection_ :

{noformat} 
outMsgMap = new HashMap<I, MsgList<M>>();
outMessages.put(addrUnresolved, outMsgMap);

PeerConnection peerConnection = new PeerConnection(outMsgMap, peer, isProxy);
{noformat} 
        
In case that there are a lot of messages available for a particular vertex, a 
large flush is trigged via _LargeMessageFlushExecutor_ (I guess this only 
happened in the wikipedia test). During this flush the list of messages for the 
vertex is sent out and replaced with an empty list in 
*BasicRPCCommunications:341*

{noformat}
outMessageList = peerConnection.outMessagesPerPeer.get(destVertex);
peerConnection.outMessagesPerPeer.put(destVertex, new MsgList<M>());
{noformat}

Now in the last flush that is trigggered at the end of the superstep we 
encounter an empty message list for the vertex and therefore the exception is 
thrown in *BasicRPCCommunications:228-247*

{noformat}
for (Entry<I, MsgList<M>> entry : peerConnection.outMessagesPerPeer.entrySet()) 
{
...
  if (entry.getValue().isEmpty()) {
    throw new IllegalStateException(...);
}
{noformat}

Simply removing the list for the vertex when executing the large flush solved 
the issue (patch to come).

I'd like to note that it is generally very dangerous to let different classes 
have access to a datastructure directly and it produces subtle bugs like this. 
It would be better to think of a centralized way of handling the datastructure. 



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to