Author: aching Date: Wed Dec 14 19:06:10 2011 New Revision: 1214406 URL: http://svn.apache.org/viewvc?rev=1214406&view=rev Log: GIRAPH-104: Save half of maximum memory used from messaging. (aching)
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java Modified: incubator/giraph/trunk/CHANGELOG incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Modified: incubator/giraph/trunk/CHANGELOG URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/CHANGELOG (original) +++ incubator/giraph/trunk/CHANGELOG Wed Dec 14 19:06:10 2011 @@ -2,6 +2,8 @@ Giraph Change Log Release 0.70.0 - unreleased + GIRAPH-104: Save half of maximum memory used from messaging. (aching) + GIRAPH-10: Aggregators are not exported. (claudio) GIRAPH-100: GIRAPH-100 - Data input sampling and testing Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java Wed Dec 14 19:06:10 2011 @@ -23,52 +23,253 @@ import org.apache.commons.cli.CommandLin import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.giraph.examples.LongSumAggregator; +import org.apache.giraph.graph.EdgeListVertex; import org.apache.giraph.graph.GiraphJob; -import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.WorkerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; - +import org.apache.log4j.Logger; import java.util.Iterator; import java.util.Random; /** - * Random Message Benchmark for evaluating the message delivery feature + * Random Message Benchmark for evaluating the messaging performance. */ -public class RandomMessageBenchmark extends - Vertex<LongWritable, DoubleWritable, DoubleWritable, BytesWritable> - implements Tool { +public class RandomMessageBenchmark implements Tool { /** Configuration from Configurable */ private Configuration conf; /** How many supersteps to run */ - public static String SUPERSTEP_COUNT = + public static final String SUPERSTEP_COUNT = "RandomMessageBenchmark.superstepCount"; - /** How many bytes per message */ - public static String NUM_BYTES_PER_MESSAGE = + public static final String NUM_BYTES_PER_MESSAGE = "RandomMessageBenchmark.numBytesPerMessage"; - /** How many bytes per message */ - public static String NUM_MESSAGES_PER_VERTEX = - "RandomMessageBenchmark.numMessagesPerVertex"; + /** Default bytes per message */ + public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16; + /** How many messages per edge */ + public static final String NUM_MESSAGES_PER_EDGE= + "RandomMessageBenchmark.numMessagesPerEdge"; + /** Default messages per edge */ + public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1; + /** All bytes sent during this superstep */ + public static final String AGG_SUPERSTEP_TOTAL_BYTES = + "superstep total bytes sent"; + /** All bytes sent during this application */ + public static final String AGG_TOTAL_BYTES = "total bytes sent"; + /** All messages during this superstep */ + public static final String AGG_SUPERSTEP_TOTAL_MESSAGES = + "superstep total messages"; + /** All messages during this application */ + public static final String AGG_TOTAL_MESSAGES = "total messages"; + /** All millis during this superstep */ + public static final String AGG_SUPERSTEP_TOTAL_MILLIS = + "superstep total millis"; + /** All millis during this application */ + public static final String AGG_TOTAL_MILLIS = "total millis"; + /** Workers for that superstep */ + public static final String WORKERS = "workers"; + + /** + * {@link WorkerContext} forRandomMessageBenchmark. + */ + private static class RandomMessageBenchmarkWorkerContext extends + WorkerContext { + /** Bytes to be sent */ + private byte[] messageBytes; + /** Number of messages sent per edge */ + private int numMessagesPerEdge = -1; + /** Number of supersteps */ + private int numSupersteps = -1; + /** Random generator for random bytes message */ + private final Random random = new Random(System.currentTimeMillis()); + /** Start superstep millis */ + private long startSuperstepMillis = 0; + /** Total bytes */ + private long totalBytes = 0; + /** Total messages */ + private long totalMessages = 0; + /** Total millis */ + private long totalMillis = 0; + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(RandomMessageBenchmarkWorkerContext.class); + + @Override + public void preApplication() + throws InstantiationException, IllegalAccessException { + messageBytes = + new byte[getContext().getConfiguration(). + getInt(NUM_BYTES_PER_MESSAGE, + DEFAULT_NUM_BYTES_PER_MESSAGE)]; + numMessagesPerEdge = + getContext().getConfiguration(). + getInt(NUM_MESSAGES_PER_EDGE, + DEFAULT_NUM_MESSAGES_PER_EDGE); + numSupersteps = getContext().getConfiguration(). + getInt(SUPERSTEP_COUNT, -1); + registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES, + LongSumAggregator.class); + registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES, + LongSumAggregator.class); + registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS, + LongSumAggregator.class); + registerAggregator(WORKERS, + LongSumAggregator.class); + } + + @Override + public void preSuperstep() { + LongSumAggregator superstepBytesAggregator = + (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES); + LongSumAggregator superstepMessagesAggregator = + (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES); + LongSumAggregator superstepMillisAggregator = + (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS); + LongSumAggregator workersAggregator = + (LongSumAggregator) getAggregator(WORKERS); + + // For timing and tracking the supersteps + // - superstep 0 starts the time, but cannot display any stats + // since nothing has been aggregated yet + // - supersteps > 0 can display the stats + if (getSuperstep() == 0) { + startSuperstepMillis = System.currentTimeMillis(); + } else { + totalBytes += + superstepBytesAggregator.getAggregatedValue().get(); + totalMessages += + superstepMessagesAggregator.getAggregatedValue().get(); + totalMillis += + superstepMillisAggregator.getAggregatedValue().get(); + double superstepMegabytesPerSecond = + superstepBytesAggregator.getAggregatedValue().get() * + workersAggregator.getAggregatedValue().get() * + 1000d / 1024d / 1024d / + superstepMillisAggregator.getAggregatedValue().get(); + double megabytesPerSecond = totalBytes * + workersAggregator.getAggregatedValue().get() * + 1000d / 1024d / 1024d / totalMillis; + double superstepMessagesPerSecond = + superstepMessagesAggregator.getAggregatedValue().get() * + workersAggregator.getAggregatedValue().get() * 1000d / + superstepMillisAggregator.getAggregatedValue().get(); + double messagesPerSecond = totalMessages * + workersAggregator.getAggregatedValue().get() * 1000d / + totalMillis; + if (LOG.isInfoEnabled()) { + LOG.info("Outputing statistics for superstep " + + getSuperstep()); + LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " + + superstepBytesAggregator.getAggregatedValue()); + LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes); + LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " + + superstepMessagesAggregator.getAggregatedValue()); + LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages); + LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " + + superstepMillisAggregator.getAggregatedValue()); + LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis); + LOG.info(WORKERS + " : " + + workersAggregator.getAggregatedValue()); + LOG.info("Superstep megabytes / second = " + + superstepMegabytesPerSecond); + LOG.info("Total megabytes / second = " + + megabytesPerSecond); + LOG.info("Superstep messages / second = " + + superstepMessagesPerSecond); + LOG.info("Total messages / second = " + + messagesPerSecond); + LOG.info("Superstep megabytes / second / worker = " + + superstepMegabytesPerSecond / + workersAggregator.getAggregatedValue().get()); + LOG.info("Total megabytes / second / worker = " + + megabytesPerSecond / + workersAggregator.getAggregatedValue().get()); + LOG.info("Superstep messages / second / worker = " + + superstepMessagesPerSecond / + workersAggregator.getAggregatedValue().get()); + LOG.info("Total messages / second / worker = " + + messagesPerSecond / + workersAggregator.getAggregatedValue().get()); + } + } + + superstepBytesAggregator.setAggregatedValue( + new LongWritable(0L)); + superstepMessagesAggregator.setAggregatedValue( + new LongWritable(0L)); + workersAggregator.setAggregatedValue( + new LongWritable(1L)); + useAggregator(AGG_SUPERSTEP_TOTAL_BYTES); + useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS); + useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES); + useAggregator(WORKERS); + } - /** Random generator for random bytes message */ - private Random rnd = new Random(System.currentTimeMillis()); + @Override + public void postSuperstep() { + LongSumAggregator superstepMillisAggregator = + (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS); + long endSuperstepMillis = System.currentTimeMillis(); + long superstepMillis = endSuperstepMillis - startSuperstepMillis; + startSuperstepMillis = endSuperstepMillis; + superstepMillisAggregator.setAggregatedValue( + new LongWritable(superstepMillis)); + } - @Override - public void compute(Iterator<BytesWritable> msgIterator) { - byte [] message = new byte[getConf().getInt(NUM_BYTES_PER_MESSAGE, 16)]; - int numMessage = getConf().getInt(NUM_MESSAGES_PER_VERTEX, 1); - if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) { - for (int i=0; i < numMessage; i++) { - rnd.nextBytes(message); - sendMsgToAllEdges(new BytesWritable(message)); + @Override + public void postApplication() {} + + public byte[] getMessageBytes() { + return messageBytes; + } + + public int getNumMessagePerEdge() { + return numMessagesPerEdge; + } + + public int getNumSupersteps() { + return numSupersteps; + } + + public void randomizeMessageBytes() { + random.nextBytes(messageBytes); + } + } + + /** + * Actual message computation (messaging in this case) + */ + public static class RandomMessageVertex extends EdgeListVertex< + LongWritable, DoubleWritable, DoubleWritable, BytesWritable> { + + @Override + public void compute(Iterator<BytesWritable> msgIterator) { + RandomMessageBenchmarkWorkerContext workerContext = + (RandomMessageBenchmarkWorkerContext) getWorkerContext(); + LongSumAggregator superstepBytesAggregator = + (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES); + LongSumAggregator superstepMessagesAggregator = + (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES); + if (getSuperstep() < workerContext.getNumSupersteps()) { + for (int i = 0; i < workerContext.getNumMessagePerEdge(); + i++) { + workerContext.randomizeMessageBytes(); + sendMsgToAllEdges( + new BytesWritable(workerContext.getMessageBytes())); + long bytesSent = workerContext.getMessageBytes().length * + getNumOutEdges(); + superstepBytesAggregator.aggregate(bytesSent); + superstepMessagesAggregator.aggregate(getNumOutEdges()); + } + } else { + voteToHalt(); } - } else { - voteToHalt(); } } @@ -155,8 +356,9 @@ public class RandomMessageBenchmark exte int workers = Integer.parseInt(cmd.getOptionValue('w')); GiraphJob job = new GiraphJob(getConf(), getClass().getName()); job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0); - job.setVertexClass(getClass()); + job.setVertexClass(RandomMessageVertex.class); job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class); + job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class); job.setWorkerConfiguration(workers, workers, 100.0f); job.getConfiguration().setLong( PseudoRandomVertexInputFormat.AGGREGATE_VERTICES, @@ -171,7 +373,7 @@ public class RandomMessageBenchmark exte RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE, Integer.parseInt(cmd.getOptionValue('b'))); job.getConfiguration().setInt( - RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX, + RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE, Integer.parseInt(cmd.getOptionValue('n'))); boolean isVerbose = false; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java Wed Dec 14 19:06:10 2011 @@ -48,8 +48,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -57,6 +57,7 @@ import java.util.concurrent.Future; import org.apache.giraph.graph.WorkerInfo; import org.apache.giraph.graph.partition.Partition; import org.apache.giraph.graph.partition.PartitionOwner; +import org.apache.giraph.utils.MemoryUtils; /*if[HADOOP_FACEBOOK] import org.apache.hadoop.ipc.ProtocolSignature; @@ -142,7 +143,7 @@ public abstract class BasicRPCCommunicat */ private final Map<I, VertexMutations<I, V, E, M>> inVertexMutationsMap = - new TreeMap<I, VertexMutations<I, V, E, M>>(); + new HashMap<I, VertexMutations<I, V, E, M>>(); /** Maximum size of cached message list, before sending it out */ private final int maxSize; @@ -160,7 +161,7 @@ public abstract class BasicRPCCommunicat private class PeerConnection { /** * Map of outbound messages going to a particular remote server, - * mapping from vertex range (max vertex index) to list of messages. + * mapping from the destination vertex to a list of messages. * (Synchronized with itself). */ private final Map<I, MsgList<M>> outMessagesPerPeer; @@ -168,7 +169,6 @@ public abstract class BasicRPCCommunicat * Client interface: RPC proxy for remote server, this class for local */ private final CommunicationsInterface<I, V, E, M> peer; - /** Maximum size of cached message list, before sending it out */ /** Boolean, set to false when local client (self), true otherwise */ private final boolean isProxy; @@ -190,11 +190,18 @@ public abstract class BasicRPCCommunicat public CommunicationsInterface<I, V, E, M> getRPCProxy() { return peer; } + + @Override + public String toString() { + return peer.getName() + ", proxy=" + isProxy; + } } private class PeerFlushExecutor implements Runnable { private final PeerConnection peerConnection; private final Mapper<?, ?, ?, ?>.Context context; + // Report on the status of this flusher if this interval was exceeded + private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000; PeerFlushExecutor(PeerConnection peerConnection, Mapper<?, ?, ?, ?>.Context context) { @@ -206,11 +213,17 @@ public abstract class BasicRPCCommunicat public void run() { CommunicationsInterface<I, V, E, M> proxy = peerConnection.getRPCProxy(); + long startMillis = System.currentTimeMillis(); + long lastReportedMillis = startMillis; try { + int verticesDone = 0; synchronized (peerConnection.outMessagesPerPeer) { - for (Entry<I, MsgList<M>> e : - peerConnection.outMessagesPerPeer.entrySet()) { + final int vertices = + peerConnection.outMessagesPerPeer.size(); + while (!peerConnection.outMessagesPerPeer.isEmpty()) { + Entry<I, MsgList<M>> e = + peerConnection.outMessagesPerPeer.entrySet().iterator().next(); MsgList<M> msgList = e.getValue(); if (msgList.size() > 0) { @@ -222,27 +235,10 @@ public abstract class BasicRPCCommunicat proxy.putMsg(e.getKey(), combinedMsg); } } else { - if (LOG.isDebugEnabled()) { - LOG.debug("putAllMessages: " + - proxy.getName() + - " putting (list) " + msgList + - " to " + e.getKey() + - ", proxy = " + - peerConnection.isProxy); - } proxy.putMsgList(e.getKey(), msgList); } - msgList.clear(); } else { for (M msg : msgList) { - if (LOG.isDebugEnabled()) { - LOG.debug("putAllMessages: " - + proxy.getName() + - " putting " + msg + - " to " + e.getKey() + - ", proxy = " + - peerConnection.isProxy); - } if (msg == null) { throw new IllegalArgumentException( "putAllMessages: Cannot put " + @@ -251,7 +247,38 @@ public abstract class BasicRPCCommunicat proxy.putMsg(e.getKey(), msg); context.progress(); } - msgList.clear(); + } + msgList.clear(); + } + + // Clean up the memory with the message list + msgList = null; + peerConnection.outMessagesPerPeer.remove(e.getKey()); + e = null; + + ++verticesDone; + long curMillis = System.currentTimeMillis(); + if ((lastReportedMillis + + REPORTING_INTERVAL_MIN_MILLIS) < curMillis) { + lastReportedMillis = curMillis; + if (LOG.isInfoEnabled()) { + float percentDone = + (100f * verticesDone) / + vertices; + float minutesUsed = + (curMillis - startMillis) / 1000f / 60f; + float minutesRemaining = + (minutesUsed * 100f / percentDone) - + minutesUsed; + LOG.info("run: " + peerConnection + ", " + + verticesDone + " out of " + + vertices + + " done in " + minutesUsed + + " minutes, " + + percentDone + "% done, ETA " + + minutesRemaining + + " minutes remaining, " + + MemoryUtils.getRuntimeMemoryStats()); } } } @@ -297,14 +324,14 @@ public abstract class BasicRPCCommunicat peerConnection.getRPCProxy(); if (combiner != null) { - M combinedMsg = combiner.combine(destVertex, - outMessage); - if (combinedMsg != null) { - proxy.putMsg(destVertex, combinedMsg); - } - } else { - proxy.putMsgList(destVertex, outMessage); + M combinedMsg = combiner.combine(destVertex, + outMessage); + if (combinedMsg != null) { + proxy.putMsg(destVertex, combinedMsg); } + } else { + proxy.putMsgList(destVertex, outMessage); + } } catch (IOException e) { LOG.error(e); if (peerConnection.isProxy) { @@ -560,7 +587,7 @@ end[HADOOP_FACEBOOK]*/ synchronized(transientInMessages) { msgs = transientInMessages.get(vertex); if (msgs == null) { - msgs = new ArrayList<M>(); + msgs = new ArrayList<M>(msgList.size()); transientInMessages.put(vertex, msgs); } } @@ -833,7 +860,9 @@ end[HADOOP_FACEBOOK]*/ @Override public long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException { if (LOG.isInfoEnabled()) { - LOG.info("flush: starting for superstep " + service.getSuperstep()); + LOG.info("flush: starting for superstep " + + service.getSuperstep() + " " + + MemoryUtils.getRuntimeMemoryStats()); } for (List<M> msgList : inMessages.values()) { msgList.clear(); @@ -856,11 +885,20 @@ end[HADOOP_FACEBOOK]*/ try { future.get(); context.progress(); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new IllegalStateException("flush: Got IOException", e); + } catch (ExecutionException e) { + throw new IllegalStateException( + "flush: Got ExecutionException", e); } } + if (LOG.isInfoEnabled()) { + LOG.info("flush: ended for superstep " + + service.getSuperstep() + " " + + MemoryUtils.getRuntimeMemoryStats()); + } + long msgs = totalMsgsSentInSuperstep; totalMsgsSentInSuperstep = 0; return msgs; @@ -869,7 +907,9 @@ end[HADOOP_FACEBOOK]*/ @Override public void prepareSuperstep() { if (LOG.isInfoEnabled()) { - LOG.info("prepareSuperstep"); + LOG.info("prepareSuperstep: Superstep " + + service.getSuperstep() + " " + + MemoryUtils.getRuntimeMemoryStats()); } inPrepareSuperstep = true; Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java Wed Dec 14 19:06:10 2011 @@ -26,27 +26,30 @@ import org.apache.giraph.graph.Aggregato * Aggregator for summing up values. */ public class LongSumAggregator implements Aggregator<LongWritable> { + /** Internal sum */ + private long sum = 0; - private long sum = 0; - - public void aggregate(long value) { - sum += value; - } - - public void aggregate(LongWritable value) { - sum += value.get(); - } - - public void setAggregatedValue(LongWritable value) { - sum = value.get(); - } - - public LongWritable getAggregatedValue() { - return new LongWritable(sum); - } - - public LongWritable createAggregatedValue() { - return new LongWritable(); - } - + public void aggregate(long value) { + sum += value; + } + + @Override + public void aggregate(LongWritable value) { + sum += value.get(); + } + + @Override + public void setAggregatedValue(LongWritable value) { + sum = value.get(); + } + + @Override + public LongWritable getAggregatedValue() { + return new LongWritable(sum); + } + + @Override + public LongWritable createAggregatedValue() { + return new LongWritable(); + } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Wed Dec 14 19:06:10 2011 @@ -29,6 +29,7 @@ import org.apache.giraph.graph.partition import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.giraph.graph.partition.PartitionStats; import org.apache.giraph.graph.partition.WorkerGraphPartitioner; +import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; @@ -471,9 +472,7 @@ public class BspServiceWorker< String status = "readVerticesFromInputSplit: Loaded " + totalVerticesLoaded + " vertices and " + totalEdgesLoaded + " edges " + - ", totalMem = " + Runtime.getRuntime().totalMemory() + - " maxMem =" + Runtime.getRuntime().maxMemory() + - " freeMem=" + Runtime.getRuntime().freeMemory() + " " + + MemoryUtils.getRuntimeMemoryStats() + " " + getGraphMapper().getMapFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep(); @@ -899,11 +898,13 @@ public class BspServiceWorker< // // Master will coordinate the barriers and aggregate "doneness" of all // the vertices. Each worker will: - // 1. Save aggregator values that are in use. - // 2. Report the statistics (vertices, edges, messages, etc.) - // of this worker - // 3. Let the master know it is finished. - // 4. Then it waits for the master to say whether to stop or not. + // 1. Flush the unsent messages + // 2. Execute user postSuperstep() if necessary. + // 3. Save aggregator values that are in use. + // 4. Report the statistics (vertices, edges, messages, etc.) + // of this worker + // 5. Let the master know it is finished. + // 6. Wait for the master's global stats, and check if done long workerSentMessages = 0; try { workerSentMessages = commService.flush(getContext()); @@ -911,6 +912,17 @@ public class BspServiceWorker< throw new IllegalStateException( "finishSuperstep: flush failed", e); } + + if (getSuperstep() != INPUT_SUPERSTEP) { + getWorkerContext().postSuperstep(); + getContext().progress(); + } + + if (LOG.isInfoEnabled()) { + LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " + + MemoryUtils.getRuntimeMemoryStats()); + } + JSONArray aggregatorValueArray = marshalAggregatorValues(getSuperstep()); Collection<PartitionStats> finalizedPartitionStats = Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Wed Dec 14 19:06:10 2011 @@ -22,6 +22,7 @@ import org.apache.giraph.bsp.Centralized import org.apache.giraph.graph.partition.Partition; import org.apache.giraph.graph.partition.PartitionOwner; import org.apache.giraph.graph.partition.PartitionStats; +import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.zk.ZooKeeperManager; import org.apache.hadoop.conf.Configuration; @@ -530,10 +531,7 @@ public class GraphMapper<I extends Writa } if (LOG.isDebugEnabled()) { - LOG.debug("map: totalMem=" + - Runtime.getRuntime().totalMemory() + - " maxMem=" + Runtime.getRuntime().maxMemory() + - " freeMem=" + Runtime.getRuntime().freeMemory()); + LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats()); } context.progress(); @@ -576,6 +574,8 @@ public class GraphMapper<I extends Writa basicVertex.getMsgList().iterator(); context.progress(); basicVertex.compute(vertexMsgIt); + // Hint to GC to free the messages + basicVertex.getMsgList().clear(); } if (basicVertex.isHalted()) { partitionStats.incrFinishedVertexCount(); @@ -585,15 +585,6 @@ public class GraphMapper<I extends Writa } partitionStatsList.add(partitionStats); } - - serviceWorker.getWorkerContext().postSuperstep(); - context.progress(); - if (LOG.isInfoEnabled()) { - LOG.info("map: totalMem=" - + Runtime.getRuntime().totalMemory() + - " maxMem=" + Runtime.getRuntime().maxMemory() + - " freeMem=" + Runtime.getRuntime().freeMemory()); - } } while (!serviceWorker.finishSuperstep(partitionStatsList)); if (LOG.isInfoEnabled()) { LOG.info("map: BSP application done " + Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1214406&r1=1214405&r2=1214406&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java Wed Dec 14 19:06:10 2011 @@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.Mappe /** * WorkerContext allows for the execution of user code * on a per-worker basis. There's one WorkerContext per worker. - * */ @SuppressWarnings("rawtypes") public abstract class WorkerContext implements AggregatorUsage { @@ -34,35 +33,35 @@ public abstract class WorkerContext impl public void setGraphState(GraphState graphState) { this.graphState = graphState; } - + /** * Initialize the WorkerContext. - * This method is executed once on each Worker before the first + * This method is executed once on each Worker before the first * superstep starts. - * - * @throws IllegalAccessException - * @throws InstantiationException + * + * @throws IllegalAccessException + * @throws InstantiationException */ public abstract void preApplication() throws InstantiationException, IllegalAccessException; - + /** * Finalize the WorkerContext. - * This method is executed once on each Worker after the last + * This method is executed once on each Worker after the last * superstep ends. */ public abstract void postApplication(); /** * Execute user code. - * This method is executed once on each Worker before each + * This method is executed once on each Worker before each * superstep starts. */ public abstract void preSuperstep(); - + /** * Execute user code. - * This method is executed once on each Worker after each + * This method is executed once on each Worker after each * superstep ends. */ public abstract void postSuperstep(); @@ -75,7 +74,7 @@ public abstract class WorkerContext impl public long getSuperstep() { return graphState.getSuperstep(); } - + /** * Get the total (all workers) number of vertices that * existed in the previous superstep. @@ -85,7 +84,7 @@ public abstract class WorkerContext impl public long getNumVertices() { return graphState.getNumVertices(); } - + /** * Get the total (all workers) number of edges that * existed in the previous superstep. @@ -95,7 +94,7 @@ public abstract class WorkerContext impl public long getNumEdges() { return graphState.getNumEdges(); } - + /** * Get the mapper context * @@ -104,7 +103,7 @@ public abstract class WorkerContext impl public Mapper.Context getContext() { return graphState.getContext(); } - + @Override public final <A extends Writable> Aggregator<A> registerAggregator( String name, Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1214406&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java Wed Dec 14 19:06:10 2011 @@ -0,0 +1,21 @@ +package org.apache.giraph.utils; + +/** + * Helper static methods for tracking memory usage. + */ +public class MemoryUtils { + /** + * Get stringified runtime memory stats + * + * @return String of all Runtime stats. + */ + public static String getRuntimeMemoryStats() { + return "totalMem = " + + (Runtime.getRuntime().totalMemory() / 1024f / 1024f) + + "M, maxMem = " + + (Runtime.getRuntime().maxMemory() / 1024f / 1024f) + + "M, freeMem = " + + (Runtime.getRuntime().freeMemory() / 1024f / 1024f) + + "M"; + } +}