Author: aching
Date: Wed Dec 14 19:06:10 2011
New Revision: 1214406
URL: http://svn.apache.org/viewvc?rev=1214406&view=rev
Log:
GIRAPH-104: Save half of maximum memory used from messaging. (aching)
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Dec 14 19:06:10 2011
@@ -2,6 +2,8 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-104: Save half of maximum memory used from messaging. (aching)
+
GIRAPH-10: Aggregators are not exported. (claudio)
GIRAPH-100: GIRAPH-100 - Data input sampling and testing
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
Wed Dec 14 19:06:10 2011
@@ -23,52 +23,253 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.examples.LongSumAggregator;
+import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-
+import org.apache.log4j.Logger;
import java.util.Iterator;
import java.util.Random;
/**
- * Random Message Benchmark for evaluating the message delivery feature
+ * Random Message Benchmark for evaluating the messaging performance.
*/
-public class RandomMessageBenchmark extends
- Vertex<LongWritable, DoubleWritable, DoubleWritable, BytesWritable>
- implements Tool {
+public class RandomMessageBenchmark implements Tool {
/** Configuration from Configurable */
private Configuration conf;
/** How many supersteps to run */
- public static String SUPERSTEP_COUNT =
+ public static final String SUPERSTEP_COUNT =
"RandomMessageBenchmark.superstepCount";
-
/** How many bytes per message */
- public static String NUM_BYTES_PER_MESSAGE =
+ public static final String NUM_BYTES_PER_MESSAGE =
"RandomMessageBenchmark.numBytesPerMessage";
- /** How many bytes per message */
- public static String NUM_MESSAGES_PER_VERTEX =
- "RandomMessageBenchmark.numMessagesPerVertex";
+ /** Default bytes per message */
+ public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
+ /** How many messages per edge */
+ public static final String NUM_MESSAGES_PER_EDGE=
+ "RandomMessageBenchmark.numMessagesPerEdge";
+ /** Default messages per edge */
+ public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
+ /** All bytes sent during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_BYTES =
+ "superstep total bytes sent";
+ /** All bytes sent during this application */
+ public static final String AGG_TOTAL_BYTES = "total bytes sent";
+ /** All messages during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
+ "superstep total messages";
+ /** All messages during this application */
+ public static final String AGG_TOTAL_MESSAGES = "total messages";
+ /** All millis during this superstep */
+ public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
+ "superstep total millis";
+ /** All millis during this application */
+ public static final String AGG_TOTAL_MILLIS = "total millis";
+ /** Workers for that superstep */
+ public static final String WORKERS = "workers";
+
+ /**
+ * {@link WorkerContext} forRandomMessageBenchmark.
+ */
+ private static class RandomMessageBenchmarkWorkerContext extends
+ WorkerContext {
+ /** Bytes to be sent */
+ private byte[] messageBytes;
+ /** Number of messages sent per edge */
+ private int numMessagesPerEdge = -1;
+ /** Number of supersteps */
+ private int numSupersteps = -1;
+ /** Random generator for random bytes message */
+ private final Random random = new Random(System.currentTimeMillis());
+ /** Start superstep millis */
+ private long startSuperstepMillis = 0;
+ /** Total bytes */
+ private long totalBytes = 0;
+ /** Total messages */
+ private long totalMessages = 0;
+ /** Total millis */
+ private long totalMillis = 0;
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ messageBytes =
+ new byte[getContext().getConfiguration().
+ getInt(NUM_BYTES_PER_MESSAGE,
+ DEFAULT_NUM_BYTES_PER_MESSAGE)];
+ numMessagesPerEdge =
+ getContext().getConfiguration().
+ getInt(NUM_MESSAGES_PER_EDGE,
+ DEFAULT_NUM_MESSAGES_PER_EDGE);
+ numSupersteps = getContext().getConfiguration().
+ getInt(SUPERSTEP_COUNT, -1);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+ LongSumAggregator.class);
+ registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+ LongSumAggregator.class);
+ registerAggregator(WORKERS,
+ LongSumAggregator.class);
+ }
+
+ @Override
+ public void preSuperstep() {
+ LongSumAggregator superstepBytesAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+ LongSumAggregator superstepMessagesAggregator =
+ (LongSumAggregator)
getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+ LongSumAggregator superstepMillisAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+ LongSumAggregator workersAggregator =
+ (LongSumAggregator) getAggregator(WORKERS);
+
+ // For timing and tracking the supersteps
+ // - superstep 0 starts the time, but cannot display any stats
+ // since nothing has been aggregated yet
+ // - supersteps > 0 can display the stats
+ if (getSuperstep() == 0) {
+ startSuperstepMillis = System.currentTimeMillis();
+ } else {
+ totalBytes +=
+ superstepBytesAggregator.getAggregatedValue().get();
+ totalMessages +=
+ superstepMessagesAggregator.getAggregatedValue().get();
+ totalMillis +=
+ superstepMillisAggregator.getAggregatedValue().get();
+ double superstepMegabytesPerSecond =
+ superstepBytesAggregator.getAggregatedValue().get() *
+ workersAggregator.getAggregatedValue().get() *
+ 1000d / 1024d / 1024d /
+ superstepMillisAggregator.getAggregatedValue().get();
+ double megabytesPerSecond = totalBytes *
+ workersAggregator.getAggregatedValue().get() *
+ 1000d / 1024d / 1024d / totalMillis;
+ double superstepMessagesPerSecond =
+ superstepMessagesAggregator.getAggregatedValue().get()
*
+ workersAggregator.getAggregatedValue().get() * 1000d /
+ superstepMillisAggregator.getAggregatedValue().get();
+ double messagesPerSecond = totalMessages *
+ workersAggregator.getAggregatedValue().get() * 1000d /
+ totalMillis;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Outputing statistics for superstep " +
+ getSuperstep());
+ LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
+ superstepBytesAggregator.getAggregatedValue());
+ LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
+ LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
+ superstepMessagesAggregator.getAggregatedValue());
+ LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
+ LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
+ superstepMillisAggregator.getAggregatedValue());
+ LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
+ LOG.info(WORKERS + " : " +
+ workersAggregator.getAggregatedValue());
+ LOG.info("Superstep megabytes / second = " +
+ superstepMegabytesPerSecond);
+ LOG.info("Total megabytes / second = " +
+ megabytesPerSecond);
+ LOG.info("Superstep messages / second = " +
+ superstepMessagesPerSecond);
+ LOG.info("Total messages / second = " +
+ messagesPerSecond);
+ LOG.info("Superstep megabytes / second / worker = " +
+ superstepMegabytesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ LOG.info("Total megabytes / second / worker = " +
+ megabytesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ LOG.info("Superstep messages / second / worker = " +
+ superstepMessagesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ LOG.info("Total messages / second / worker = " +
+ messagesPerSecond /
+ workersAggregator.getAggregatedValue().get());
+ }
+ }
+
+ superstepBytesAggregator.setAggregatedValue(
+ new LongWritable(0L));
+ superstepMessagesAggregator.setAggregatedValue(
+ new LongWritable(0L));
+ workersAggregator.setAggregatedValue(
+ new LongWritable(1L));
+ useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+ useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+ useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+ useAggregator(WORKERS);
+ }
- /** Random generator for random bytes message */
- private Random rnd = new Random(System.currentTimeMillis());
+ @Override
+ public void postSuperstep() {
+ LongSumAggregator superstepMillisAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+ long endSuperstepMillis = System.currentTimeMillis();
+ long superstepMillis = endSuperstepMillis - startSuperstepMillis;
+ startSuperstepMillis = endSuperstepMillis;
+ superstepMillisAggregator.setAggregatedValue(
+ new LongWritable(superstepMillis));
+ }
- @Override
- public void compute(Iterator<BytesWritable> msgIterator) {
- byte [] message = new byte[getConf().getInt(NUM_BYTES_PER_MESSAGE,
16)];
- int numMessage = getConf().getInt(NUM_MESSAGES_PER_VERTEX, 1);
- if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
- for (int i=0; i < numMessage; i++) {
- rnd.nextBytes(message);
- sendMsgToAllEdges(new BytesWritable(message));
+ @Override
+ public void postApplication() {}
+
+ public byte[] getMessageBytes() {
+ return messageBytes;
+ }
+
+ public int getNumMessagePerEdge() {
+ return numMessagesPerEdge;
+ }
+
+ public int getNumSupersteps() {
+ return numSupersteps;
+ }
+
+ public void randomizeMessageBytes() {
+ random.nextBytes(messageBytes);
+ }
+ }
+
+ /**
+ * Actual message computation (messaging in this case)
+ */
+ public static class RandomMessageVertex extends EdgeListVertex<
+ LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
+
+ @Override
+ public void compute(Iterator<BytesWritable> msgIterator) {
+ RandomMessageBenchmarkWorkerContext workerContext =
+ (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+ LongSumAggregator superstepBytesAggregator =
+ (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+ LongSumAggregator superstepMessagesAggregator =
+ (LongSumAggregator)
getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+ if (getSuperstep() < workerContext.getNumSupersteps()) {
+ for (int i = 0; i < workerContext.getNumMessagePerEdge();
+ i++) {
+ workerContext.randomizeMessageBytes();
+ sendMsgToAllEdges(
+ new BytesWritable(workerContext.getMessageBytes()));
+ long bytesSent = workerContext.getMessageBytes().length *
+ getNumOutEdges();
+ superstepBytesAggregator.aggregate(bytesSent);
+ superstepMessagesAggregator.aggregate(getNumOutEdges());
+ }
+ } else {
+ voteToHalt();
}
- } else {
- voteToHalt();
}
}
@@ -155,8 +356,9 @@ public class RandomMessageBenchmark exte
int workers = Integer.parseInt(cmd.getOptionValue('w'));
GiraphJob job = new GiraphJob(getConf(), getClass().getName());
job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
- job.setVertexClass(getClass());
+ job.setVertexClass(RandomMessageVertex.class);
job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+ job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
job.setWorkerConfiguration(workers, workers, 100.0f);
job.getConfiguration().setLong(
PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
@@ -171,7 +373,7 @@ public class RandomMessageBenchmark exte
RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
Integer.parseInt(cmd.getOptionValue('b')));
job.getConfiguration().setInt(
- RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX,
+ RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
Integer.parseInt(cmd.getOptionValue('n')));
boolean isVerbose = false;
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
Wed Dec 14 19:06:10 2011
@@ -48,8 +48,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -57,6 +57,7 @@ import java.util.concurrent.Future;
import org.apache.giraph.graph.WorkerInfo;
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.MemoryUtils;
/*if[HADOOP_FACEBOOK]
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -142,7 +143,7 @@ public abstract class BasicRPCCommunicat
*/
private final Map<I, VertexMutations<I, V, E, M>>
inVertexMutationsMap =
- new TreeMap<I, VertexMutations<I, V, E, M>>();
+ new HashMap<I, VertexMutations<I, V, E, M>>();
/** Maximum size of cached message list, before sending it out */
private final int maxSize;
@@ -160,7 +161,7 @@ public abstract class BasicRPCCommunicat
private class PeerConnection {
/**
* Map of outbound messages going to a particular remote server,
- * mapping from vertex range (max vertex index) to list of messages.
+ * mapping from the destination vertex to a list of messages.
* (Synchronized with itself).
*/
private final Map<I, MsgList<M>> outMessagesPerPeer;
@@ -168,7 +169,6 @@ public abstract class BasicRPCCommunicat
* Client interface: RPC proxy for remote server, this class for local
*/
private final CommunicationsInterface<I, V, E, M> peer;
- /** Maximum size of cached message list, before sending it out */
/** Boolean, set to false when local client (self), true otherwise */
private final boolean isProxy;
@@ -190,11 +190,18 @@ public abstract class BasicRPCCommunicat
public CommunicationsInterface<I, V, E, M> getRPCProxy() {
return peer;
}
+
+ @Override
+ public String toString() {
+ return peer.getName() + ", proxy=" + isProxy;
+ }
}
private class PeerFlushExecutor implements Runnable {
private final PeerConnection peerConnection;
private final Mapper<?, ?, ?, ?>.Context context;
+ // Report on the status of this flusher if this interval was exceeded
+ private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000;
PeerFlushExecutor(PeerConnection peerConnection,
Mapper<?, ?, ?, ?>.Context context) {
@@ -206,11 +213,17 @@ public abstract class BasicRPCCommunicat
public void run() {
CommunicationsInterface<I, V, E, M> proxy
= peerConnection.getRPCProxy();
+ long startMillis = System.currentTimeMillis();
+ long lastReportedMillis = startMillis;
try {
+ int verticesDone = 0;
synchronized (peerConnection.outMessagesPerPeer) {
- for (Entry<I, MsgList<M>> e :
- peerConnection.outMessagesPerPeer.entrySet()) {
+ final int vertices =
+ peerConnection.outMessagesPerPeer.size();
+ while (!peerConnection.outMessagesPerPeer.isEmpty()) {
+ Entry<I, MsgList<M>> e =
+
peerConnection.outMessagesPerPeer.entrySet().iterator().next();
MsgList<M> msgList = e.getValue();
if (msgList.size() > 0) {
@@ -222,27 +235,10 @@ public abstract class BasicRPCCommunicat
proxy.putMsg(e.getKey(), combinedMsg);
}
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putAllMessages: " +
- proxy.getName() +
- " putting (list) " + msgList +
- " to " + e.getKey() +
- ", proxy = " +
- peerConnection.isProxy);
- }
proxy.putMsgList(e.getKey(), msgList);
}
- msgList.clear();
} else {
for (M msg : msgList) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("putAllMessages: "
- + proxy.getName() +
- " putting " + msg +
- " to " + e.getKey() +
- ", proxy = " +
- peerConnection.isProxy);
- }
if (msg == null) {
throw new IllegalArgumentException(
"putAllMessages: Cannot put " +
@@ -251,7 +247,38 @@ public abstract class BasicRPCCommunicat
proxy.putMsg(e.getKey(), msg);
context.progress();
}
- msgList.clear();
+ }
+ msgList.clear();
+ }
+
+ // Clean up the memory with the message list
+ msgList = null;
+ peerConnection.outMessagesPerPeer.remove(e.getKey());
+ e = null;
+
+ ++verticesDone;
+ long curMillis = System.currentTimeMillis();
+ if ((lastReportedMillis +
+ REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
+ lastReportedMillis = curMillis;
+ if (LOG.isInfoEnabled()) {
+ float percentDone =
+ (100f * verticesDone) /
+ vertices;
+ float minutesUsed =
+ (curMillis - startMillis) / 1000f / 60f;
+ float minutesRemaining =
+ (minutesUsed * 100f / percentDone) -
+ minutesUsed;
+ LOG.info("run: " + peerConnection + ", " +
+ verticesDone + " out of " +
+ vertices +
+ " done in " + minutesUsed +
+ " minutes, " +
+ percentDone + "% done, ETA " +
+ minutesRemaining +
+ " minutes remaining, " +
+ MemoryUtils.getRuntimeMemoryStats());
}
}
}
@@ -297,14 +324,14 @@ public abstract class BasicRPCCommunicat
peerConnection.getRPCProxy();
if (combiner != null) {
- M combinedMsg = combiner.combine(destVertex,
- outMessage);
- if (combinedMsg != null) {
- proxy.putMsg(destVertex, combinedMsg);
- }
- } else {
- proxy.putMsgList(destVertex, outMessage);
+ M combinedMsg = combiner.combine(destVertex,
+ outMessage);
+ if (combinedMsg != null) {
+ proxy.putMsg(destVertex, combinedMsg);
}
+ } else {
+ proxy.putMsgList(destVertex, outMessage);
+ }
} catch (IOException e) {
LOG.error(e);
if (peerConnection.isProxy) {
@@ -560,7 +587,7 @@ end[HADOOP_FACEBOOK]*/
synchronized(transientInMessages) {
msgs = transientInMessages.get(vertex);
if (msgs == null) {
- msgs = new ArrayList<M>();
+ msgs = new ArrayList<M>(msgList.size());
transientInMessages.put(vertex, msgs);
}
}
@@ -833,7 +860,9 @@ end[HADOOP_FACEBOOK]*/
@Override
public long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException {
if (LOG.isInfoEnabled()) {
- LOG.info("flush: starting for superstep " +
service.getSuperstep());
+ LOG.info("flush: starting for superstep " +
+ service.getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
}
for (List<M> msgList : inMessages.values()) {
msgList.clear();
@@ -856,11 +885,20 @@ end[HADOOP_FACEBOOK]*/
try {
future.get();
context.progress();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("flush: Got IOException", e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "flush: Got ExecutionException", e);
}
}
+ if (LOG.isInfoEnabled()) {
+ LOG.info("flush: ended for superstep " +
+ service.getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
long msgs = totalMsgsSentInSuperstep;
totalMsgsSentInSuperstep = 0;
return msgs;
@@ -869,7 +907,9 @@ end[HADOOP_FACEBOOK]*/
@Override
public void prepareSuperstep() {
if (LOG.isInfoEnabled()) {
- LOG.info("prepareSuperstep");
+ LOG.info("prepareSuperstep: Superstep " +
+ service.getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
}
inPrepareSuperstep = true;
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
Wed Dec 14 19:06:10 2011
@@ -26,27 +26,30 @@ import org.apache.giraph.graph.Aggregato
* Aggregator for summing up values.
*/
public class LongSumAggregator implements Aggregator<LongWritable> {
+ /** Internal sum */
+ private long sum = 0;
- private long sum = 0;
-
- public void aggregate(long value) {
- sum += value;
- }
-
- public void aggregate(LongWritable value) {
- sum += value.get();
- }
-
- public void setAggregatedValue(LongWritable value) {
- sum = value.get();
- }
-
- public LongWritable getAggregatedValue() {
- return new LongWritable(sum);
- }
-
- public LongWritable createAggregatedValue() {
- return new LongWritable();
- }
-
+ public void aggregate(long value) {
+ sum += value;
+ }
+
+ @Override
+ public void aggregate(LongWritable value) {
+ sum += value.get();
+ }
+
+ @Override
+ public void setAggregatedValue(LongWritable value) {
+ sum = value.get();
+ }
+
+ @Override
+ public LongWritable getAggregatedValue() {
+ return new LongWritable(sum);
+ }
+
+ @Override
+ public LongWritable createAggregatedValue() {
+ return new LongWritable();
+ }
}
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Wed Dec 14 19:06:10 2011
@@ -29,6 +29,7 @@ import org.apache.giraph.graph.partition
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
@@ -471,9 +472,7 @@ public class BspServiceWorker<
String status = "readVerticesFromInputSplit: Loaded " +
totalVerticesLoaded + " vertices and " +
totalEdgesLoaded + " edges " +
- ", totalMem = " + Runtime.getRuntime().totalMemory() +
- " maxMem =" + Runtime.getRuntime().maxMemory() +
- " freeMem=" + Runtime.getRuntime().freeMemory() + " " +
+ MemoryUtils.getRuntimeMemoryStats() + " " +
getGraphMapper().getMapFunctions().toString() +
" - Attempt=" + getApplicationAttempt() +
", Superstep=" + getSuperstep();
@@ -899,11 +898,13 @@ public class BspServiceWorker<
//
// Master will coordinate the barriers and aggregate "doneness" of all
// the vertices. Each worker will:
- // 1. Save aggregator values that are in use.
- // 2. Report the statistics (vertices, edges, messages, etc.)
- // of this worker
- // 3. Let the master know it is finished.
- // 4. Then it waits for the master to say whether to stop or not.
+ // 1. Flush the unsent messages
+ // 2. Execute user postSuperstep() if necessary.
+ // 3. Save aggregator values that are in use.
+ // 4. Report the statistics (vertices, edges, messages, etc.)
+ // of this worker
+ // 5. Let the master know it is finished.
+ // 6. Wait for the master's global stats, and check if done
long workerSentMessages = 0;
try {
workerSentMessages = commService.flush(getContext());
@@ -911,6 +912,17 @@ public class BspServiceWorker<
throw new IllegalStateException(
"finishSuperstep: flush failed", e);
}
+
+ if (getSuperstep() != INPUT_SUPERSTEP) {
+ getWorkerContext().postSuperstep();
+ getContext().progress();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
+ MemoryUtils.getRuntimeMemoryStats());
+ }
+
JSONArray aggregatorValueArray =
marshalAggregatorValues(getSuperstep());
Collection<PartitionStats> finalizedPartitionStats =
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
Wed Dec 14 19:06:10 2011
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.Centralized
import org.apache.giraph.graph.partition.Partition;
import org.apache.giraph.graph.partition.PartitionOwner;
import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
@@ -530,10 +531,7 @@ public class GraphMapper<I extends Writa
}
if (LOG.isDebugEnabled()) {
- LOG.debug("map: totalMem=" +
- Runtime.getRuntime().totalMemory() +
- " maxMem=" + Runtime.getRuntime().maxMemory() +
- " freeMem=" + Runtime.getRuntime().freeMemory());
+ LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
}
context.progress();
@@ -576,6 +574,8 @@ public class GraphMapper<I extends Writa
basicVertex.getMsgList().iterator();
context.progress();
basicVertex.compute(vertexMsgIt);
+ // Hint to GC to free the messages
+ basicVertex.getMsgList().clear();
}
if (basicVertex.isHalted()) {
partitionStats.incrFinishedVertexCount();
@@ -585,15 +585,6 @@ public class GraphMapper<I extends Writa
}
partitionStatsList.add(partitionStats);
}
-
- serviceWorker.getWorkerContext().postSuperstep();
- context.progress();
- if (LOG.isInfoEnabled()) {
- LOG.info("map: totalMem="
- + Runtime.getRuntime().totalMemory() +
- " maxMem=" + Runtime.getRuntime().maxMemory() +
- " freeMem=" + Runtime.getRuntime().freeMemory());
- }
} while (!serviceWorker.finishSuperstep(partitionStatsList));
if (LOG.isInfoEnabled()) {
LOG.info("map: BSP application done " +
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
Wed Dec 14 19:06:10 2011
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.Mappe
/**
* WorkerContext allows for the execution of user code
* on a per-worker basis. There's one WorkerContext per worker.
- *
*/
@SuppressWarnings("rawtypes")
public abstract class WorkerContext implements AggregatorUsage {
@@ -34,35 +33,35 @@ public abstract class WorkerContext impl
public void setGraphState(GraphState graphState) {
this.graphState = graphState;
}
-
+
/**
* Initialize the WorkerContext.
- * This method is executed once on each Worker before the first
+ * This method is executed once on each Worker before the first
* superstep starts.
- *
- * @throws IllegalAccessException
- * @throws InstantiationException
+ *
+ * @throws IllegalAccessException
+ * @throws InstantiationException
*/
public abstract void preApplication() throws InstantiationException,
IllegalAccessException;
-
+
/**
* Finalize the WorkerContext.
- * This method is executed once on each Worker after the last
+ * This method is executed once on each Worker after the last
* superstep ends.
*/
public abstract void postApplication();
/**
* Execute user code.
- * This method is executed once on each Worker before each
+ * This method is executed once on each Worker before each
* superstep starts.
*/
public abstract void preSuperstep();
-
+
/**
* Execute user code.
- * This method is executed once on each Worker after each
+ * This method is executed once on each Worker after each
* superstep ends.
*/
public abstract void postSuperstep();
@@ -75,7 +74,7 @@ public abstract class WorkerContext impl
public long getSuperstep() {
return graphState.getSuperstep();
}
-
+
/**
* Get the total (all workers) number of vertices that
* existed in the previous superstep.
@@ -85,7 +84,7 @@ public abstract class WorkerContext impl
public long getNumVertices() {
return graphState.getNumVertices();
}
-
+
/**
* Get the total (all workers) number of edges that
* existed in the previous superstep.
@@ -95,7 +94,7 @@ public abstract class WorkerContext impl
public long getNumEdges() {
return graphState.getNumEdges();
}
-
+
/**
* Get the mapper context
*
@@ -104,7 +103,7 @@ public abstract class WorkerContext impl
public Mapper.Context getContext() {
return graphState.getContext();
}
-
+
@Override
public final <A extends Writable> Aggregator<A> registerAggregator(
String name,
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1214406&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
Wed Dec 14 19:06:10 2011
@@ -0,0 +1,21 @@
+package org.apache.giraph.utils;
+
+/**
+ * Helper static methods for tracking memory usage.
+ */
+public class MemoryUtils {
+ /**
+ * Get stringified runtime memory stats
+ *
+ * @return String of all Runtime stats.
+ */
+ public static String getRuntimeMemoryStats() {
+ return "totalMem = " +
+ (Runtime.getRuntime().totalMemory() / 1024f / 1024f) +
+ "M, maxMem = " +
+ (Runtime.getRuntime().maxMemory() / 1024f / 1024f) +
+ "M, freeMem = " +
+ (Runtime.getRuntime().freeMemory() / 1024f / 1024f)
+ + "M";
+ }
+}