Author: hyunsik
Date: Wed Oct 5 00:41:27 2011
New Revision: 1179029
URL: http://svn.apache.org/viewvc?rev=1179029&view=rev
Log:
GIRAPH-12: Investigate communication improvements (hyunsik)
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Oct 5 00:41:27 2011
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-12: Investigate communication improvements. (hyunsik)
+
GIRAPH-46: Race condition on superstep 1 with RPC servers not
started by the time that requests are sent. (aching)
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
Wed Oct 5 00:41:27 2011
@@ -40,7 +40,7 @@ import java.util.Random;
*/
public class RandomMessageBenchmark extends
Vertex<LongWritable, DoubleWritable, DoubleWritable, BytesWritable>
- implements Tool {
+ implements Tool {
/** Configuration from Configurable */
private Configuration conf;
@@ -121,6 +121,10 @@ public class RandomMessageBenchmark exte
"edgesPerVertex",
true,
"Edges per vertex");
+ options.addOption("f",
+ "flusher",
+ true,
+ "Number of flush threads");
HelpFormatter formatter = new HelpFormatter();
if (args.length == 0) {
@@ -160,6 +164,7 @@ public class RandomMessageBenchmark exte
}
int workers = Integer.parseInt(cmd.getOptionValue('w'));
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
+ job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
job.setVertexClass(getClass());
job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
job.setWorkerConfiguration(workers, workers, 100.0f);
@@ -187,6 +192,10 @@ public class RandomMessageBenchmark exte
getConf().setInt(SUPERSTEP_COUNT,
Integer.parseInt(cmd.getOptionValue('s')));
}
+ if (cmd.hasOption('f')) {
+ job.getConfiguration().setInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
+ Integer.parseInt(cmd.getOptionValue('f')));
+ }
if (job.run(isVerbose) == true) {
return 0;
} else {
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/ArrayListWritable.java
Wed Oct 5 00:41:27 2011
@@ -47,6 +47,10 @@ public abstract class ArrayListWritable<
*/
public ArrayListWritable() {
}
+
+ public ArrayListWritable(ArrayListWritable<M> arrayListWritable) {
+ super(arrayListWritable);
+ }
/**
* This constructor allows setting the refClass during construction.
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
Wed Oct 5 00:41:27 2011
@@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,6 +35,9 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.log4j.Logger;
@@ -69,8 +74,6 @@ public abstract class BasicRPCCommunicat
/** Class logger */
private static final Logger LOG =
Logger.getLogger(BasicRPCCommunications.class);
- /** Synchronization object (Between this object and peer threads) */
- private Object waitingInMain = new Object();
/** Indicates whether in superstep preparation */
private boolean inPrepareSuperstep = false;
/** Local hostname */
@@ -90,10 +93,15 @@ public abstract class BasicRPCCommunicat
/** Messages sent during the last superstep */
private long totalMsgsSentInSuperstep = 0;
/**
- * Map of threads mapping from remote socket address to RPC client threads
+ * Map of the peer connections, mapping from remote socket address to
client
+ * meta data
*/
- private final Map<InetSocketAddress, PeerThread> peerThreads =
- new HashMap<InetSocketAddress, PeerThread>();
+ private final Map<InetSocketAddress, PeerConnection> peerConnections =
+ new HashMap<InetSocketAddress, PeerConnection>();
+ /**
+ * Thread pool for message flush threads
+ */
+ private final ExecutorService executor;
/**
* Map of outbound messages, mapping from remote server to
* destination vertex index to list of messages
@@ -137,285 +145,177 @@ public abstract class BasicRPCCommunicat
new HashMap<I, InetSocketAddress>();
/** Maximum size of cached message list, before sending it out */
private final int maxSize;
- /** Maximum msecs to hold messages before checking again */
- private static final int MAX_MESSAGE_HOLDING_MSECS = 2000;
/** Cached job id */
private final String jobId;
/** cached job token */
private final J jobToken;
/** maximum number of vertices sent in a single RPC */
private static final int MAX_VERTICES_PER_RPC = 1024;
-
+
/**
- * Class describing the RPC client thread for every remote RPC server.
- * It actually marshals and ships the RPC call to the remote RPC server
- * for actual execution.
+ * PeerConnection contains RPC client and accumulated messages
+ * for a specific peer.
*/
- private class PeerThread extends Thread {
+ private class PeerConnection {
/**
* Map of outbound messages going to a particular remote server,
* mapping from vertex range (max vertex index) to list of messages.
* (Synchronized with itself).
*/
- private final Map<I, MsgList<M>> outMessagesPerPeer;
+ private final Map<I, MsgList<M>> outMessagesPerPeer;
/**
* Client interface: RPC proxy for remote server, this class for local
*/
private final CommunicationsInterface<I, V, E, M> peer;
/** Maximum size of cached message list, before sending it out */
- private final int maxSize;
/** Boolean, set to false when local client (self), true otherwise */
private final boolean isProxy;
- /** Boolean, set to true when all messages should be flushed */
- private boolean flush = false;
- /** associated synchronization object */
- private final Object flushObject = new Object();
- /** Boolean, set to true when client should terminate */
- private boolean notDone = true;
- /** associated synchronization object */
- private final Object notDoneObject = new Object();
- /** Boolean, set to true when there is a large message list to flush */
- private boolean flushLargeMsgLists = false;
- /** associated synchronization object */
- private final Object flushLargeMsgListsObject = new Object();
- /** Synchronization object */
- private final Object waitingInPeer = new Object();
- /** Combiner instance, can be null */
- private final VertexCombiner<I, M> combiner;
- /** set of keys of large message list (synchronized with itself) */
- private final Set<I> largeMsgListKeys = new TreeSet<I>();
-
- PeerThread(Map<I, MsgList<M>> m,
- CommunicationsInterface<I, V, E, M> i,
- int maxSize,
- boolean isProxy,
- VertexCombiner<I, M> combiner) {
- super(PeerThread.class.getName());
+
+ public PeerConnection(Map<I, MsgList<M>> m,
+ CommunicationsInterface<I, V, E, M> i,
+ boolean isProxy) {
+
this.outMessagesPerPeer = m;
this.peer = i;
- this.maxSize = maxSize;
this.isProxy = isProxy;
- this.combiner = combiner;
- }
-
- public void flushLargeMsgList(I key) {
- synchronized (largeMsgListKeys) {
- largeMsgListKeys.add(key);
- }
- synchronized (flushLargeMsgListsObject) {
- flushLargeMsgLists = true;
- }
- synchronized (waitingInPeer) {
- waitingInPeer.notify();
- }
- }
-
- /**
- * Notify this thread to send issue the put() RPCs.
- */
- public void flush() {
- synchronized (flushObject) {
- flush = true;
- }
- synchronized (waitingInPeer) {
- waitingInPeer.notify();
- }
- }
-
- public boolean getFlushState() {
- synchronized(flushObject) {
- return flush;
- }
- }
-
- public boolean getNotDoneState() {
- synchronized(notDoneObject) {
- return notDone;
- }
}
-
- private boolean getFlushMsgListsState() {
- synchronized (flushLargeMsgListsObject) {
- return flushLargeMsgLists;
- }
- }
-
+
public void close() {
if (LOG.isDebugEnabled()) {
LOG.debug("close: Done");
}
- synchronized (notDoneObject) {
- notDone = false;
- }
- synchronized (waitingInPeer) {
- waitingInPeer.notify();
- }
}
public CommunicationsInterface<I, V, E, M> getRPCProxy() {
return peer;
}
+ }
+
+ private class PeerFlushExecutor implements Runnable {
+ PeerConnection peerConnection;
- /**
- * Issue all the RPC put() to the peer (local or remote) for normal
- * messages.
- *
- * @throws IOException
- */
- private void putAllMessages() throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putAllMessages: " + peer.getName() +
- ": issuing RPCs");
- }
- synchronized (outMessagesPerPeer) {
- for (Entry<I, MsgList<M>> e :
- outMessagesPerPeer.entrySet()) {
- MsgList<M> msgList = e.getValue();
- if (msgList.size() > 0) {
- if (msgList.size() > 1) {
- if (combiner != null) {
- M combinedMsg = combiner.combine(e.getKey(),
- msgList);
- if (combinedMsg != null) {
- peer.putMsg(e.getKey(), combinedMsg);
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putAllMessages: " +
- peer.getName() +
- " putting (list) " + msgList +
- " to " + e.getKey() +
- ", proxy = " + isProxy);
- }
- peer.putMsgList(e.getKey(), msgList);
- }
- msgList.clear();
- } else {
- for (M msg : msgList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putAllMessages: "
- + peer.getName() +
- " putting " + msg +
- " to " + e.getKey() +
- ", proxy = " + isProxy);
- }
- if (msg == null) {
- throw new IllegalArgumentException(
- "putAllMessages: Cannot put " +
- "null message on " + e.getKey());
- }
- peer.putMsg(e.getKey(), msg);
- }
- msgList.clear();
- }
- }
- }
- }
+ PeerFlushExecutor(PeerConnection peerConnection) {
+ this.peerConnection = peerConnection;
}
@Override
- public void run() {
+ public void run() {
+ CommunicationsInterface<I, V, E, M> proxy
+ = peerConnection.getRPCProxy();
+
try {
- while (true) {
- Set<I> largeMsgListKeysValue = null;
- boolean flushValue = getFlushState();
- boolean notDoneValue = getNotDoneState();
- boolean flushLargeMsgListsValues = getFlushMsgListsState();
- while (notDoneValue && !flushValue) {
- try {
- synchronized (waitingInPeer) {
- waitingInPeer.wait(MAX_MESSAGE_HOLDING_MSECS);
- }
- } catch (InterruptedException e) {
- // continue;
- }
- if (flushLargeMsgListsValues) {
- synchronized (largeMsgListKeys) {
- largeMsgListKeysValue =
- new TreeSet<I>(largeMsgListKeys);
- largeMsgListKeys.clear();
- }
- synchronized (flushLargeMsgListsObject) {
- flushLargeMsgLists = false;
- }
- LOG.info("run: " + peer.getName() +
- ": flushLargeMsgLists " +
- largeMsgListKeysValue.size());
- break;
- }
- flushValue = getFlushState();
- notDoneValue = getNotDoneState();
- flushLargeMsgLists = getFlushMsgListsState();
- }
- if (!notDoneValue) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("run: NotDone=" + notDone +
- ", flush=" + flush);
- }
- break;
- }
+ synchronized (peerConnection.outMessagesPerPeer) {
+ for (Entry<I, MsgList<M>> e :
+ peerConnection.outMessagesPerPeer.entrySet()) {
+ MsgList<M> msgList = e.getValue();
- if (flushValue) {
- putAllMessages();
- if (LOG.isDebugEnabled()) {
- LOG.debug("run: " + peer.getName() +
- ": all messages flushed");
- }
- synchronized (flushObject) {
- flush = false;
- }
- synchronized (flushLargeMsgListsObject) {
- flushLargeMsgLists = false;
- }
- synchronized (waitingInMain) {
- waitingInMain.notify();
- }
- }
- else if (largeMsgListKeysValue != null &&
- largeMsgListKeysValue.size() > 0) {
- for (I destVertex : largeMsgListKeysValue) {
- MsgList<M> msgList = null;
- synchronized(outMessagesPerPeer) {
- msgList = outMessagesPerPeer.get(destVertex);
- if (msgList == null ||
- msgList.size() <= maxSize) {
- continue;
- }
+ if (msgList.size() > 0) {
+ if (msgList.size() > 1) {
if (combiner != null) {
- M combinedMsg =
combiner.combine(destVertex,
- msgList);
+ M combinedMsg =
combiner.combine(e.getKey(),
+ msgList);
if (combinedMsg != null) {
- peer.putMsg(destVertex, combinedMsg);
+ proxy.putMsg(e.getKey(), combinedMsg);
}
} else {
- peer.putMsgList(destVertex, msgList);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putAllMessages: " +
+ proxy.getName() +
+ " putting (list) " + msgList +
+ " to " + e.getKey() +
+ ", proxy = " +
+ peerConnection.isProxy);
+ }
+ proxy.putMsgList(e.getKey(), msgList);
+ }
+ msgList.clear();
+ } else {
+ for (M msg : msgList) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("putAllMessages: "
+ + proxy.getName() +
+ " putting " + msg +
+ " to " + e.getKey() +
+ ", proxy = " +
+ peerConnection.isProxy);
+ }
+ if (msg == null) {
+ throw new IllegalArgumentException(
+ "putAllMessages: Cannot put " +
+ "null message on " +
e.getKey());
+ }
+ proxy.putMsg(e.getKey(), msg);
}
msgList.clear();
}
}
}
}
- if (isProxy) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("run: RPC client thread terminating...");
- }
- RPC.stopProxy(peer);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("run: " + proxy.getName() +
+ ": all messages flushed");
}
} catch (IOException e) {
- LOG.error(e);
- synchronized (notDoneObject) {
- notDone = false;
+ LOG.error(e);
+ if (peerConnection.isProxy) {
+ RPC.stopProxy(peerConnection.peer);
}
- synchronized (waitingInMain) {
- waitingInMain.notify();
- }
- if (isProxy) {
- RPC.stopProxy(peer);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * LargeMessageFlushExecutor flushes all outgoing messages destined to
some vertices.
+ * This is executed when the number of messages destined to certain vertex
+ * exceeds <i>maxSize</i>.
+ */
+ private class LargeMessageFlushExecutor implements Runnable {
+ final I destVertex;
+ final MsgList<M> outMessage;
+ PeerConnection peerConnection;
+
+ LargeMessageFlushExecutor(PeerConnection peerConnection, I destVertex)
{
+ this.peerConnection = peerConnection;
+ synchronized (peerConnection.outMessagesPerPeer) {
+ this.destVertex = destVertex;
+ outMessage = peerConnection.outMessagesPerPeer.get(destVertex);
+ peerConnection.outMessagesPerPeer.put(destVertex, new
MsgList<M>());
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ CommunicationsInterface<I, V, E, M> proxy =
+ peerConnection.getRPCProxy();
+
+ if (combiner != null) {
+ M combinedMsg = combiner.combine(destVertex,
+ outMessage);
+ if (combinedMsg != null) {
+ proxy.putMsg(destVertex, combinedMsg);
+ }
+ } else {
+ proxy.putMsgList(destVertex, outMessage);
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ if (peerConnection.isProxy) {
+ RPC.stopProxy(peerConnection.peer);
}
throw new RuntimeException(e);
+ } finally {
+ outMessage.clear();
}
}
}
+
+ private void submitLargeMessageSend(InetSocketAddress addr, I destVertex) {
+ PeerConnection pc = peerConnections.get(addr);
+ executor.execute(new LargeMessageFlushExecutor(pc, destVertex));
+ }
protected abstract J createJobToken() throws IOException;
@@ -467,11 +367,20 @@ public abstract class BasicRPCCommunicat
this.server.start();
this.myName = myAddress.toString();
+
+ int numWorkers = conf.getInt(GiraphJob.MAX_WORKERS, numTasks);
+ // if the number of flush threads is unset, it is set to
+ // the number of max workers.
+ int numFlushThreads = conf.getInt(GiraphJob.MSG_NUM_FLUSH_THREADS,
numWorkers-1);
+ this.executor = Executors.newFixedThreadPool(numFlushThreads);
+
if (LOG.isInfoEnabled()) {
LOG.info("BasicRPCCommunications: Started RPC " +
"communication server: " + myName + " with " +
- numHandlers + " handlers");
+ numHandlers + " handlers and " + numFlushThreads +
+ " flush threads");
}
+
connectAllRPCProxys(this.jobId, this.jobToken);
}
@@ -558,10 +467,9 @@ public abstract class BasicRPCCommunicat
outMessages.put(addrUnresolved, outMsgMap);
}
- PeerThread peerThread =
- new PeerThread(outMsgMap, peer, maxSize, isProxy, combiner);
- peerThread.start();
- peerThreads.put(addrUnresolved, peerThread);
+ PeerConnection peerConnection =
+ new PeerConnection(outMsgMap, peer, isProxy);
+ peerConnections.put(addrUnresolved, peerConnection);
}
@Override
@@ -581,16 +489,8 @@ end[HADOOP_FACEBOOK]*/
@Override
public void closeConnections() throws IOException {
- for (PeerThread pt : peerThreads.values()) {
- pt.close();
- }
-
- for (PeerThread pt : peerThreads.values()) {
- try {
- pt.join();
- } catch (InterruptedException e) {
- LOG.warn(e.getStackTrace());
- }
+ for(PeerConnection pc : peerConnections.values()) {
+ pc.close();
}
}
@@ -748,13 +648,14 @@ end[HADOOP_FACEBOOK]*/
new VertexList<I, V, E, M>();
InetSocketAddress addr = getInetSocketAddress(vertexIndexMax);
CommunicationsInterface<I, V, E, M> rpcProxy =
- peerThreads.get(addr).getRPCProxy();
+ peerConnections.get(addr).getRPCProxy();
+
if (LOG.isInfoEnabled()) {
LOG.info("sendVertexList: Sending to " + rpcProxy.getName() + " " +
addr + ", with vertex index " + vertexIndexMax +
", list " + vertexList);
}
- if (peerThreads.get(addr).isProxy == false) {
+ if(peerConnections.get(addr).isProxy == false) {
throw new RuntimeException("sendVertexList: Impossible to send " +
"to self for vertex index max " + vertexIndexMax);
}
@@ -839,7 +740,7 @@ end[HADOOP_FACEBOOK]*/
msgList.size());
}
if (msgList.size() > maxSize) {
- peerThreads.get(addr).flushLargeMsgList(destVertex);
+ submitLargeMessageSend(addr, destVertex);
}
}
}
@@ -853,7 +754,7 @@ end[HADOOP_FACEBOOK]*/
destVertex + " with address " + addr);
}
CommunicationsInterface<I, V, E, M> rpcProxy =
- peerThreads.get(addr).getRPCProxy();
+ peerConnections.get(addr).getRPCProxy();
rpcProxy.addEdge(destVertex, edge);
}
@@ -866,7 +767,7 @@ end[HADOOP_FACEBOOK]*/
") from" + vertexIndex + " with address " + addr);
}
CommunicationsInterface<I, V, E, M> rpcProxy =
- peerThreads.get(addr).getRPCProxy();
+ peerConnections.get(addr).getRPCProxy();
rpcProxy.removeEdge(vertexIndex, destVertexIndex);
}
@@ -879,7 +780,7 @@ end[HADOOP_FACEBOOK]*/
" with address " + addr);
}
CommunicationsInterface<I, V, E, M> rpcProxy =
- peerThreads.get(addr).getRPCProxy();
+ peerConnections.get(addr).getRPCProxy();
rpcProxy.addVertex(vertex);
}
@@ -892,7 +793,7 @@ end[HADOOP_FACEBOOK]*/
+ vertexIndex + ") with address " + addr);
}
CommunicationsInterface<I, V, E, M> rpcProxy =
- peerThreads.get(addr).getRPCProxy();
+ peerConnections.get(addr).getRPCProxy();
rpcProxy.removeVertex(vertexIndex);
}
@@ -904,41 +805,25 @@ end[HADOOP_FACEBOOK]*/
for (List<M> msgList : inMessages.values()) {
msgList.clear();
}
- for (PeerThread pt : peerThreads.values()) {
- pt.flush();
+ Collection<Future<?>> futures = new ArrayList<Future<?>>();
+
+ // randomize peers in order to avoid hotspot on racks
+ List<PeerConnection> peerList = new
ArrayList<PeerConnection>(peerConnections.values());
+ Collections.shuffle(peerList);
+
+ for (PeerConnection pc : peerList) {
+ futures.add(executor.submit(new PeerFlushExecutor(pc)));
}
- while (true) {
- synchronized (waitingInMain) {
- for (PeerThread pt : peerThreads.values()) {
- if (pt.getNotDoneState() && pt.getFlushState()) {
- try {
- waitingInMain.wait(MAX_MESSAGE_HOLDING_MSECS);
- if (LOG.isDebugEnabled()) {
- LOG.debug("flush: main waking up");
- }
- context.progress();
- } catch (InterruptedException e) {
- // continue;
- }
- }
- }
- boolean stillFlushing = false;
- for (Map.Entry<InetSocketAddress, PeerThread> entry :
- peerThreads.entrySet()) {
- if (!entry.getValue().getNotDoneState()) {
- throw new RuntimeException(
- "flush: peer thread " + entry.getKey() +
- " disappeared");
- }
- if (entry.getValue().getFlushState()) {
- stillFlushing = true; // still flushing
- }
- }
- if (!stillFlushing) {
- break;
- }
+
+ // wait for all flushes
+ for (Future<?> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
+
long msgs = totalMsgsSentInSuperstep;
totalMsgsSentInSuperstep = 0;
return msgs;
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/MsgList.java
Wed Oct 5 00:41:27 2011
@@ -35,6 +35,10 @@ public class MsgList<M extends Writable>
public MsgList() {
super();
}
+
+ public MsgList(MsgList<M> msgList) {
+ super(msgList);
+ }
@SuppressWarnings("unchecked")
@Override
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java?rev=1179029&r1=1179028&r2=1179029&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GiraphJob.java
Wed Oct 5 00:41:27 2011
@@ -144,6 +144,9 @@ public class GiraphJob extends Job {
public static final String MSG_SIZE = "giraph.msgSize";
/** Default maximum number of messages per peer before flush */
public static final int MSG_SIZE_DEFAULT = 1000;
+
+ /** Number of flush threads per peer */
+ public static final String MSG_NUM_FLUSH_THREADS =
"giraph.msgNumFlushThreads";
/** Number of poll attempts prior to failing the job (int) */
public static final String POLL_ATTEMPTS = "giraph.pollAttempts";