Author: hyunsik
Date: Wed Oct  5 00:41:27 2011
New Revision: 1179029

URL: http://svn.apache.org/viewvc?rev=1179029&view=rev
Log:
GIRAPH-12: Investigate communication improvements (hyunsik)

Modified:
    incubator/giraph/trunk/CHANGELOG
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Oct  5 00:41:27 2011
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-12: Investigate communication improvements. (hyunsik)
+
   GIRAPH-46: Race condition on superstep 1 with RPC servers not
   started by the time that requests are sent. (aching)
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
 Wed Oct  5 00:41:27 2011
@@ -40,7 +40,7 @@ import java.util.Random;
  */
 public class RandomMessageBenchmark extends
         Vertex<LongWritable, DoubleWritable, DoubleWritable, BytesWritable>
-        implements Tool {    
+        implements Tool {
     /** Configuration from Configurable */
     private Configuration conf;
 
@@ -121,6 +121,10 @@ public class RandomMessageBenchmark exte
                 "edgesPerVertex",
                 true,
                 "Edges per vertex");
+        options.addOption("f",
+                "flusher",
+                true,
+                "Number of flush threads");
         
         HelpFormatter formatter = new HelpFormatter();
         if (args.length == 0) {
@@ -160,6 +164,7 @@ public class RandomMessageBenchmark exte
         }
         int workers = Integer.parseInt(cmd.getOptionValue('w'));
         GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+        job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
         job.setVertexClass(getClass());
         job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
         job.setWorkerConfiguration(workers, workers, 100.0f);
@@ -187,6 +192,10 @@ public class RandomMessageBenchmark exte
             getConf().setInt(SUPERSTEP_COUNT,
                              Integer.parseInt(cmd.getOptionValue('s')));
         }
+        if (cmd.hasOption('f')) {
+            job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS, 
+                Integer.parseInt(cmd.getOptionValue('f')));         
+        }
         if (job.run(isVerbose) == true) {
             return 0;
         } else {

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
 Wed Oct  5 00:41:27 2011
@@ -47,6 +47,10 @@ public abstract class ArrayListWritable<
      */
     public ArrayListWritable() {
     }
+    
+    public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
+        super(arrayListWritable);
+    }
 
     /**
      * This constructor allows setting the refClass during construction.

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
 Wed Oct  5 00:41:27 2011
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +35,9 @@ import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.log4j.Logger;
 
@@ -69,8 +74,6 @@ public abstract class BasicRPCCommunicat
     /** Class logger */
     private static final Logger LOG =
         Logger.getLogger(BasicRPCCommunications.class);
-    /** Synchronization object (Between this object and peer threads) */
-    private Object waitingInMain = new Object();
     /** Indicates whether in superstep preparation */
     private boolean inPrepareSuperstep = false;
     /** Local hostname */
@@ -90,10 +93,15 @@ public abstract class BasicRPCCommunicat
     /** Messages sent during the last superstep */
     private long totalMsgsSentInSuperstep = 0;
     /**
-     * Map of threads mapping from remote socket address to RPC client threads
+     * Map of the peer connections, mapping from remote socket address to 
client 
+     * meta data
      */
-    private final Map<InetSocketAddress, PeerThread> peerThreads =
-        new HashMap<InetSocketAddress, PeerThread>();
+    private final Map<InetSocketAddress, PeerConnection> peerConnections =
+        new HashMap<InetSocketAddress, PeerConnection>();
+    /**
+     * Thread pool for message flush threads
+     */
+    private final ExecutorService executor; 
     /**
      * Map of outbound messages, mapping from remote server to
      * destination vertex index to list of messages
@@ -137,285 +145,177 @@ public abstract class BasicRPCCommunicat
         new HashMap<I, InetSocketAddress>();
     /** Maximum size of cached message list, before sending it out */
     private final int maxSize;
-    /** Maximum msecs to hold messages before checking again */
-    private static final int MAX_MESSAGE_HOLDING_MSECS = 2000;
     /** Cached job id */
     private final String jobId;
     /** cached job token */
     private final J jobToken;
     /** maximum number of vertices sent in a single RPC */
     private static final int MAX_VERTICES_PER_RPC = 1024;
-
+    
     /**
-     * Class describing the RPC client thread for every remote RPC server.
-     * It actually marshals and ships the RPC call to the remote RPC server
-     * for actual execution.
+     * PeerConnection contains RPC client and accumulated messages 
+     * for a specific peer.
      */
-    private class PeerThread extends Thread {
+    private class PeerConnection {
         /**
          * Map of outbound messages going to a particular remote server,
          * mapping from vertex range (max vertex index) to list of messages.
          * (Synchronized with itself).
          */
-        private final Map<I, MsgList<M>> outMessagesPerPeer;
+        private final Map<I, MsgList<M>> outMessagesPerPeer;        
         /**
          * Client interface: RPC proxy for remote server, this class for local
          */
         private final CommunicationsInterface<I, V, E, M> peer;
         /** Maximum size of cached message list, before sending it out */
-        private final int maxSize;
         /** Boolean, set to false when local client (self), true otherwise */
         private final boolean isProxy;
-        /** Boolean, set to true when all messages should be flushed */
-        private boolean flush = false;
-        /** associated synchronization object */
-        private final Object flushObject = new Object();
-        /** Boolean, set to true when client should terminate */
-        private boolean notDone = true;
-        /** associated synchronization object */
-        private final Object notDoneObject = new Object();
-        /** Boolean, set to true when there is a large message list to flush */
-        private boolean flushLargeMsgLists = false;
-        /** associated synchronization object */
-        private final Object flushLargeMsgListsObject = new Object();
-        /** Synchronization object */
-        private final Object waitingInPeer = new Object();
-        /** Combiner instance, can be null */
-        private final VertexCombiner<I, M> combiner;
-        /** set of keys of large message list (synchronized with itself) */
-        private final Set<I> largeMsgListKeys = new TreeSet<I>();
-
-        PeerThread(Map<I, MsgList<M>> m,
-                   CommunicationsInterface<I, V, E, M> i,
-                   int maxSize,
-                   boolean isProxy,
-                   VertexCombiner<I, M> combiner) {
-            super(PeerThread.class.getName());
+        
+        public PeerConnection(Map<I, MsgList<M>> m,
+            CommunicationsInterface<I, V, E, M> i,
+            boolean isProxy) {
+            
             this.outMessagesPerPeer = m;
             this.peer = i;
-            this.maxSize = maxSize;
             this.isProxy = isProxy;
-            this.combiner = combiner;
-        }
-
-        public void flushLargeMsgList(I key) {
-            synchronized (largeMsgListKeys) {
-                largeMsgListKeys.add(key);
-            }
-            synchronized (flushLargeMsgListsObject) {
-                flushLargeMsgLists = true;
-            }
-            synchronized (waitingInPeer) {
-                waitingInPeer.notify();
-            }
-        }
-
-        /**
-         * Notify this thread to send issue the put() RPCs.
-         */
-        public void flush() {
-            synchronized (flushObject) {
-                flush = true;
-            }
-            synchronized (waitingInPeer) {
-                waitingInPeer.notify();
-            }
-        }
-
-        public boolean getFlushState() {
-            synchronized(flushObject) {
-                return flush;
-            }
-        }
-
-        public boolean getNotDoneState() {
-            synchronized(notDoneObject) {
-                return notDone;
-            }
         }
-
-        private boolean getFlushMsgListsState() {
-            synchronized (flushLargeMsgListsObject) {
-                return flushLargeMsgLists;
-            }
-        }
-
+        
         public void close() {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("close: Done");
             }
-            synchronized (notDoneObject) {
-                notDone = false;
-            }
-            synchronized (waitingInPeer) {
-                waitingInPeer.notify();
-            }
         }
 
         public CommunicationsInterface<I, V, E, M> getRPCProxy() {
             return peer;
         }
+    }
+    
+    private class PeerFlushExecutor implements Runnable {
+        PeerConnection peerConnection;
 
-        /**
-         * Issue all the RPC put() to the peer (local or remote) for normal
-         * messages.
-         *
-         * @throws IOException
-         */
-        private void putAllMessages() throws IOException {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("putAllMessages: " + peer.getName() +
-                          ": issuing RPCs");
-            }
-            synchronized (outMessagesPerPeer) {
-                for (Entry<I, MsgList<M>> e :
-                        outMessagesPerPeer.entrySet()) {
-                    MsgList<M> msgList = e.getValue();
-                    if (msgList.size() > 0) {
-                        if (msgList.size() > 1) {
-                            if (combiner != null) {
-                                M combinedMsg = combiner.combine(e.getKey(),
-                                                                 msgList);
-                                if (combinedMsg != null) {
-                                    peer.putMsg(e.getKey(), combinedMsg);
-                                }
-                            } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("putAllMessages: " +
-                                              peer.getName() +
-                                              " putting (list) " + msgList +
-                                              " to " + e.getKey() +
-                                              ", proxy = " + isProxy);
-                                }
-                                peer.putMsgList(e.getKey(), msgList);
-                            }
-                            msgList.clear();
-                        } else {
-                            for (M msg : msgList) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("putAllMessages: "
-                                              + peer.getName() +
-                                              " putting " + msg +
-                                              " to " + e.getKey() +
-                                              ", proxy = " + isProxy);
-                                }
-                                if (msg == null) {
-                                    throw new IllegalArgumentException(
-                                        "putAllMessages: Cannot put " +
-                                        "null message on " + e.getKey());
-                                }
-                                peer.putMsg(e.getKey(), msg);
-                            }
-                            msgList.clear();
-                        }
-                    }
-                }
-            }
+        PeerFlushExecutor(PeerConnection peerConnection) {
+            this.peerConnection = peerConnection;
         }
 
         @Override
-        public void run() {
+        public void run() {            
+            CommunicationsInterface<I, V, E, M> proxy
+                = peerConnection.getRPCProxy();
+
             try {
-                while (true) {
-                    Set<I> largeMsgListKeysValue = null;
-                    boolean flushValue = getFlushState();
-                    boolean notDoneValue = getNotDoneState();
-                    boolean flushLargeMsgListsValues = getFlushMsgListsState();
-                    while (notDoneValue && !flushValue) {
-                        try {
-                            synchronized (waitingInPeer) {
-                                waitingInPeer.wait(MAX_MESSAGE_HOLDING_MSECS);
-                            }
-                        } catch (InterruptedException e) {
-                            // continue;
-                        }
-                        if (flushLargeMsgListsValues) {
-                            synchronized (largeMsgListKeys) {
-                                largeMsgListKeysValue =
-                                    new TreeSet<I>(largeMsgListKeys);
-                                largeMsgListKeys.clear();
-                            }
-                            synchronized (flushLargeMsgListsObject) {
-                                flushLargeMsgLists = false;
-                            }
-                            LOG.info("run: " + peer.getName() +
-                                     ": flushLargeMsgLists " +
-                                     largeMsgListKeysValue.size());
-                            break;
-                        }
-                        flushValue = getFlushState();
-                        notDoneValue = getNotDoneState();
-                        flushLargeMsgLists = getFlushMsgListsState();
-                    }
-                    if (!notDoneValue) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("run: NotDone=" + notDone +
-                                      ", flush=" + flush);
-                        }
-                        break;
-                    }
+                synchronized (peerConnection.outMessagesPerPeer) {
+                    for (Entry<I, MsgList<M>> e :
+                        peerConnection.outMessagesPerPeer.entrySet()) {
+                        MsgList<M> msgList = e.getValue();
 
-                    if (flushValue) {
-                        putAllMessages();
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("run: " + peer.getName() +
-                                      ": all messages flushed");
-                        }
-                        synchronized (flushObject) {
-                            flush = false;
-                        }
-                        synchronized (flushLargeMsgListsObject) {
-                            flushLargeMsgLists = false;
-                        }
-                        synchronized (waitingInMain) {
-                            waitingInMain.notify();
-                        }
-                    }
-                    else if (largeMsgListKeysValue != null &&
-                            largeMsgListKeysValue.size() > 0) {
-                        for (I destVertex : largeMsgListKeysValue) {
-                            MsgList<M> msgList = null;
-                            synchronized(outMessagesPerPeer) {
-                                msgList = outMessagesPerPeer.get(destVertex);
-                                if (msgList == null ||
-                                        msgList.size() <= maxSize) {
-                                    continue;
-                                }
+                        if (msgList.size() > 0) {
+                            if (msgList.size() > 1) {
                                 if (combiner != null) {
-                                    M combinedMsg = 
combiner.combine(destVertex,
-                                                                     msgList);
+                                    M combinedMsg = 
combiner.combine(e.getKey(),
+                                        msgList);
                                     if (combinedMsg != null) {
-                                        peer.putMsg(destVertex, combinedMsg);
+                                        proxy.putMsg(e.getKey(), combinedMsg);
                                     }
                                 } else {
-                                    peer.putMsgList(destVertex, msgList);
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("putAllMessages: " +
+                                            proxy.getName() +
+                                            " putting (list) " + msgList +
+                                            " to " + e.getKey() +
+                                            ", proxy = " + 
+                                            peerConnection.isProxy);
+                                    }
+                                    proxy.putMsgList(e.getKey(), msgList);
+                                }
+                                msgList.clear();
+                            } else {
+                                for (M msg : msgList) {
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("putAllMessages: "
+                                            + proxy.getName() +
+                                            " putting " + msg +
+                                            " to " + e.getKey() +
+                                            ", proxy = " + 
+                                            peerConnection.isProxy);
+                                    }
+                                    if (msg == null) {
+                                        throw new IllegalArgumentException(
+                                            "putAllMessages: Cannot put " +
+                                                "null message on " + 
e.getKey());
+                                    }
+                                    proxy.putMsg(e.getKey(), msg);
                                 }
                                 msgList.clear();
                             }
                         }
                     }
                 }
-                if (isProxy) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("run: RPC client thread terminating...");
-                    }
-                    RPC.stopProxy(peer);
+
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("run: " + proxy.getName() +
+                        ": all messages flushed");
                 }
             } catch (IOException e) {
-                LOG.error(e);
-                synchronized (notDoneObject) {
-                    notDone = false;
+                LOG.error(e);                
+                if (peerConnection.isProxy) {
+                    RPC.stopProxy(peerConnection.peer);
                 }
-                synchronized (waitingInMain) {
-                    waitingInMain.notify();
-                }
-                if (isProxy) {
-                    RPC.stopProxy(peer);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    /**
+     * LargeMessageFlushExecutor flushes all outgoing messages destined to 
some vertices.
+     * This is executed when the number of messages destined to certain vertex
+     * exceeds <i>maxSize</i>.
+     */
+    private class LargeMessageFlushExecutor implements Runnable {
+        final I destVertex;
+        final MsgList<M> outMessage;        
+        PeerConnection peerConnection;
+
+        LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex) 
{
+            this.peerConnection = peerConnection;
+            synchronized (peerConnection.outMessagesPerPeer) {
+                this.destVertex = destVertex;
+                outMessage = peerConnection.outMessagesPerPeer.get(destVertex);
+                peerConnection.outMessagesPerPeer.put(destVertex, new 
MsgList<M>());
+            }
+        }
+
+        @Override
+        public void run() {     
+            try {
+                CommunicationsInterface<I, V, E, M> proxy = 
+                    peerConnection.getRPCProxy();
+                
+                if (combiner != null) {
+                        M combinedMsg = combiner.combine(destVertex,
+                                                         outMessage);
+                        if (combinedMsg != null) {
+                            proxy.putMsg(destVertex, combinedMsg);
+                        }
+                    } else {
+                        proxy.putMsgList(destVertex, outMessage);
+                    }
+            } catch (IOException e) {
+                LOG.error(e);                
+                if (peerConnection.isProxy) {
+                    RPC.stopProxy(peerConnection.peer);
                 }
                 throw new RuntimeException(e);
+            } finally {
+                outMessage.clear();
             }
         }
     }
+    
+    private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
+        PeerConnection pc = peerConnections.get(addr);
+        executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
+    }
 
     protected abstract J createJobToken() throws IOException;
 
@@ -467,11 +367,20 @@ public abstract class BasicRPCCommunicat
         this.server.start();
 
         this.myName = myAddress.toString();
+        
+        int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
+        // if the number of flush threads is unset, it is set to 
+        // the number of max workers.
+        int numFlushThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS, 
numWorkers-1);
+        this.executor = Executors.newFixedThreadPool(numFlushThreads);
+        
         if (LOG.isInfoEnabled()) {
             LOG.info("BasicRPCCommunications: Started RPC " +
                      "communication server: " + myName + " with " +
-                     numHandlers + " handlers");
+                     numHandlers + " handlers and " + numFlushThreads + 
+                     " flush threads");
         }
+        
         connectAllRPCProxys(this.jobId, this.jobToken);
     }
 
@@ -558,10 +467,9 @@ public abstract class BasicRPCCommunicat
             outMessages.put(addrUnresolved, outMsgMap);
         }
 
-        PeerThread peerThread =
-            new PeerThread(outMsgMap, peer, maxSize, isProxy, combiner);
-        peerThread.start();
-        peerThreads.put(addrUnresolved, peerThread);
+        PeerConnection peerConnection =
+            new PeerConnection(outMsgMap, peer, isProxy);
+        peerConnections.put(addrUnresolved, peerConnection);
     }
 
     @Override
@@ -581,16 +489,8 @@ end[HADOOP_FACEBOOK]*/
 
     @Override
     public void closeConnections() throws IOException {
-        for (PeerThread pt : peerThreads.values()) {
-            pt.close();
-        }
-
-        for (PeerThread pt : peerThreads.values()) {
-            try {
-                pt.join();
-            } catch (InterruptedException e) {
-                LOG.warn(e.getStackTrace());
-            }
+        for(PeerConnection pc : peerConnections.values()) {
+            pc.close();
         }
     }
 
@@ -748,13 +648,14 @@ end[HADOOP_FACEBOOK]*/
             new VertexList<I, V, E, M>();
         InetSocketAddress addr = getInetSocketAddress(vertexIndexMax);
         CommunicationsInterface<I, V, E, M> rpcProxy =
-            peerThreads.get(addr).getRPCProxy();
+            peerConnections.get(addr).getRPCProxy();
+            
         if (LOG.isInfoEnabled()) {
             LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " +
                      addr + ", with vertex index " + vertexIndexMax +
                      ", list " + vertexList);
         }
-        if (peerThreads.get(addr).isProxy == false) {
+        if(peerConnections.get(addr).isProxy == false) {
             throw new RuntimeException("sendVertexList: Impossible to send " +
                 "to self for vertex index max " + vertexIndexMax);
         }
@@ -839,7 +740,7 @@ end[HADOOP_FACEBOOK]*/
                           msgList.size());
             }
             if (msgList.size() > maxSize) {
-                peerThreads.get(addr).flushLargeMsgList(destVertex);
+                submitLargeMessageSend(addr, destVertex);
             }
         }
     }
@@ -853,7 +754,7 @@ end[HADOOP_FACEBOOK]*/
                       destVertex + " with address " + addr);
         }
         CommunicationsInterface<I, V, E, M> rpcProxy =
-            peerThreads.get(addr).getRPCProxy();
+            peerConnections.get(addr).getRPCProxy();
         rpcProxy.addEdge(destVertex, edge);
     }
 
@@ -866,7 +767,7 @@ end[HADOOP_FACEBOOK]*/
                       ") from" + vertexIndex + " with address " + addr);
         }
         CommunicationsInterface<I, V, E, M> rpcProxy =
-            peerThreads.get(addr).getRPCProxy();
+            peerConnections.get(addr).getRPCProxy();
         rpcProxy.removeEdge(vertexIndex, destVertexIndex);
     }
 
@@ -879,7 +780,7 @@ end[HADOOP_FACEBOOK]*/
                       " with address " + addr);
         }
         CommunicationsInterface<I, V, E, M> rpcProxy =
-            peerThreads.get(addr).getRPCProxy();
+            peerConnections.get(addr).getRPCProxy();
         rpcProxy.addVertex(vertex);
     }
 
@@ -892,7 +793,7 @@ end[HADOOP_FACEBOOK]*/
                       + vertexIndex + ")  with address " + addr);
         }
         CommunicationsInterface<I, V, E, M> rpcProxy =
-            peerThreads.get(addr).getRPCProxy();
+            peerConnections.get(addr).getRPCProxy();
         rpcProxy.removeVertex(vertexIndex);
     }
 
@@ -904,41 +805,25 @@ end[HADOOP_FACEBOOK]*/
         for (List<M> msgList : inMessages.values()) {
             msgList.clear();
         }
-        for (PeerThread pt : peerThreads.values()) {
-            pt.flush();
+        Collection<Future<?>> futures = new ArrayList<Future<?>>();
+
+        // randomize peers in order to avoid hotspot on racks
+        List<PeerConnection> peerList = new 
ArrayList<PeerConnection>(peerConnections.values());
+        Collections.shuffle(peerList);
+
+        for (PeerConnection pc : peerList) {
+            futures.add(executor.submit(new PeerFlushExecutor(pc)));    
         }
-        while (true) {
-            synchronized (waitingInMain) {
-                for (PeerThread pt : peerThreads.values()) {
-                    if (pt.getNotDoneState() && pt.getFlushState()) {
-                        try {
-                            waitingInMain.wait(MAX_MESSAGE_HOLDING_MSECS);
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("flush: main waking up");
-                            }
-                            context.progress();
-                        } catch (InterruptedException e) {
-                            // continue;
-                        }
-                    }
-                }
-                boolean stillFlushing = false;
-                for (Map.Entry<InetSocketAddress, PeerThread> entry :
-                        peerThreads.entrySet()) {
-                    if (!entry.getValue().getNotDoneState()) {
-                        throw new RuntimeException(
-                            "flush: peer thread " + entry.getKey() +
-                            " disappeared");
-                    }
-                    if (entry.getValue().getFlushState()) {
-                        stillFlushing = true; // still flushing
-                    }
-                }
-                if (!stillFlushing) {
-                    break;
-                }
+
+        // wait for all flushes
+        for (Future<?> future : futures) {
+            try {
+                future.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
+
         long msgs = totalMsgsSentInSuperstep;
         totalMsgsSentInSuperstep = 0;
         return msgs;

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java 
Wed Oct  5 00:41:27 2011
@@ -35,6 +35,10 @@ public class MsgList<M extends Writable>
     public MsgList() {
         super();
     }
+    
+    public MsgList(MsgList<M> msgList) {
+        super(msgList);
+    }
 
     @SuppressWarnings("unchecked")
     @Override

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java 
Wed Oct  5 00:41:27 2011
@@ -144,6 +144,9 @@ public class GiraphJob extends Job {
     public static final String MSG_SIZE = "giraph.msgSize";
     /** Default maximum number of messages per peer before flush */
     public static final int MSG_SIZE_DEFAULT = 1000;
+    
+    /** Number of flush threads per peer */
+    public static final String MSG_NUM_FLUSH_THREADS = 
"giraph.msgNumFlushThreads";
 
     /** Number of poll attempts prior to failing the job (int) */
     public static final String POLL_ATTEMPTS = "giraph.pollAttempts";


Reply via email to