Author: aching
Date: Wed Dec 14 19:06:10 2011
New Revision: 1214406

URL: http://svn.apache.org/viewvc?rev=1214406&view=rev
Log:
GIRAPH-104: Save half of maximum memory used from messaging. (aching)


Added:
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Dec 14 19:06:10 2011
@@ -2,6 +2,8 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-104: Save half of maximum memory used from messaging. (aching)
+
   GIRAPH-10: Aggregators are not exported. (claudio)
 
   GIRAPH-100: GIRAPH-100 - Data input sampling and testing

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/benchmark/RandomMessageBenchmark.java
 Wed Dec 14 19:06:10 2011
@@ -23,52 +23,253 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.giraph.examples.LongSumAggregator;
+import org.apache.giraph.graph.EdgeListVertex;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-
+import org.apache.log4j.Logger;
 import java.util.Iterator;
 import java.util.Random;
 
 /**
- * Random Message Benchmark for evaluating the message delivery feature
+ * Random Message Benchmark for evaluating the messaging performance.
  */
-public class RandomMessageBenchmark extends
-        Vertex<LongWritable, DoubleWritable, DoubleWritable, BytesWritable>
-        implements Tool {
+public class RandomMessageBenchmark implements Tool {
     /** Configuration from Configurable */
     private Configuration conf;
 
     /** How many supersteps to run */
-    public static String SUPERSTEP_COUNT =
+    public static final String SUPERSTEP_COUNT =
         "RandomMessageBenchmark.superstepCount";
-
     /** How many bytes per message */
-    public static String NUM_BYTES_PER_MESSAGE =
+    public static final String NUM_BYTES_PER_MESSAGE =
         "RandomMessageBenchmark.numBytesPerMessage";
-    /** How many bytes per message */
-    public static String NUM_MESSAGES_PER_VERTEX =
-        "RandomMessageBenchmark.numMessagesPerVertex";
+    /** Default bytes per message */
+    public static final int DEFAULT_NUM_BYTES_PER_MESSAGE = 16;
+    /** How many messages per edge */
+    public static final String NUM_MESSAGES_PER_EDGE=
+        "RandomMessageBenchmark.numMessagesPerEdge";
+    /** Default messages per edge */
+    public static final int DEFAULT_NUM_MESSAGES_PER_EDGE = 1;
+    /** All bytes sent during this superstep */
+    public static final String AGG_SUPERSTEP_TOTAL_BYTES =
+        "superstep total bytes sent";
+    /** All bytes sent during this application */
+    public static final String AGG_TOTAL_BYTES = "total bytes sent";
+    /** All messages during this superstep */
+    public static final String AGG_SUPERSTEP_TOTAL_MESSAGES =
+        "superstep total messages";
+    /** All messages during this application */
+    public static final String AGG_TOTAL_MESSAGES = "total messages";
+    /** All millis during this superstep */
+    public static final String AGG_SUPERSTEP_TOTAL_MILLIS =
+        "superstep total millis";
+    /** All millis during this application */
+    public static final String AGG_TOTAL_MILLIS = "total millis";
+    /** Workers for that superstep */
+    public static final String WORKERS = "workers";
+
+    /**
+     * {@link WorkerContext} forRandomMessageBenchmark.
+     */
+    private static class RandomMessageBenchmarkWorkerContext extends
+            WorkerContext {
+        /** Bytes to be sent */
+        private byte[] messageBytes;
+        /** Number of messages sent per edge */
+        private int numMessagesPerEdge = -1;
+        /** Number of supersteps */
+        private int numSupersteps = -1;
+        /** Random generator for random bytes message */
+        private final Random random = new Random(System.currentTimeMillis());
+        /** Start superstep millis */
+        private long startSuperstepMillis = 0;
+        /** Total bytes */
+        private long totalBytes = 0;
+        /** Total messages */
+        private long totalMessages = 0;
+        /** Total millis */
+        private long totalMillis = 0;
+        /** Class logger */
+        private static final Logger LOG =
+            Logger.getLogger(RandomMessageBenchmarkWorkerContext.class);
+
+        @Override
+        public void preApplication()
+                throws InstantiationException, IllegalAccessException {
+            messageBytes =
+                new byte[getContext().getConfiguration().
+                         getInt(NUM_BYTES_PER_MESSAGE,
+                               DEFAULT_NUM_BYTES_PER_MESSAGE)];
+            numMessagesPerEdge =
+                getContext().getConfiguration().
+                    getInt(NUM_MESSAGES_PER_EDGE,
+                           DEFAULT_NUM_MESSAGES_PER_EDGE);
+            numSupersteps = getContext().getConfiguration().
+                                getInt(SUPERSTEP_COUNT, -1);
+            registerAggregator(AGG_SUPERSTEP_TOTAL_BYTES,
+                LongSumAggregator.class);
+            registerAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES,
+                LongSumAggregator.class);
+            registerAggregator(AGG_SUPERSTEP_TOTAL_MILLIS,
+                LongSumAggregator.class);
+            registerAggregator(WORKERS,
+                LongSumAggregator.class);
+        }
+
+        @Override
+        public void preSuperstep() {
+            LongSumAggregator superstepBytesAggregator =
+                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+            LongSumAggregator superstepMessagesAggregator =
+                (LongSumAggregator) 
getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+            LongSumAggregator superstepMillisAggregator =
+                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+            LongSumAggregator workersAggregator =
+                (LongSumAggregator) getAggregator(WORKERS);
+
+            // For timing and tracking the supersteps
+            // - superstep 0 starts the time, but cannot display any stats
+            //   since nothing has been aggregated yet
+            // - supersteps > 0 can display the stats
+            if (getSuperstep() == 0) {
+                startSuperstepMillis = System.currentTimeMillis();
+            } else {
+                totalBytes +=
+                        superstepBytesAggregator.getAggregatedValue().get();
+                totalMessages +=
+                        superstepMessagesAggregator.getAggregatedValue().get();
+                totalMillis +=
+                        superstepMillisAggregator.getAggregatedValue().get();
+                double superstepMegabytesPerSecond =
+                        superstepBytesAggregator.getAggregatedValue().get() *
+                        workersAggregator.getAggregatedValue().get() *
+                        1000d / 1024d / 1024d /
+                        superstepMillisAggregator.getAggregatedValue().get();
+                double megabytesPerSecond = totalBytes *
+                        workersAggregator.getAggregatedValue().get() *
+                        1000d / 1024d / 1024d / totalMillis;
+                double superstepMessagesPerSecond =
+                        superstepMessagesAggregator.getAggregatedValue().get() 
*
+                        workersAggregator.getAggregatedValue().get() * 1000d /
+                        superstepMillisAggregator.getAggregatedValue().get();
+                double messagesPerSecond = totalMessages *
+                        workersAggregator.getAggregatedValue().get() * 1000d /
+                        totalMillis;
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("Outputing statistics for superstep " +
+                             getSuperstep());
+                    LOG.info(AGG_SUPERSTEP_TOTAL_BYTES + " : " +
+                             superstepBytesAggregator.getAggregatedValue());
+                    LOG.info(AGG_TOTAL_BYTES + " : " + totalBytes);
+                    LOG.info(AGG_SUPERSTEP_TOTAL_MESSAGES + " : " +
+                             superstepMessagesAggregator.getAggregatedValue());
+                    LOG.info(AGG_TOTAL_MESSAGES + " : " + totalMessages);
+                    LOG.info(AGG_SUPERSTEP_TOTAL_MILLIS + " : " +
+                             superstepMillisAggregator.getAggregatedValue());
+                    LOG.info(AGG_TOTAL_MILLIS + " : " + totalMillis);
+                    LOG.info(WORKERS + " : " +
+                             workersAggregator.getAggregatedValue());
+                    LOG.info("Superstep megabytes / second = " +
+                             superstepMegabytesPerSecond);
+                    LOG.info("Total megabytes / second = " +
+                             megabytesPerSecond);
+                    LOG.info("Superstep messages / second = " +
+                             superstepMessagesPerSecond);
+                    LOG.info("Total messages / second = " +
+                             messagesPerSecond);
+                    LOG.info("Superstep megabytes / second / worker = " +
+                             superstepMegabytesPerSecond /
+                             workersAggregator.getAggregatedValue().get());
+                    LOG.info("Total megabytes / second / worker = " +
+                             megabytesPerSecond /
+                             workersAggregator.getAggregatedValue().get());
+                    LOG.info("Superstep messages / second / worker = " +
+                             superstepMessagesPerSecond /
+                             workersAggregator.getAggregatedValue().get());
+                    LOG.info("Total messages / second / worker = " +
+                             messagesPerSecond /
+                             workersAggregator.getAggregatedValue().get());
+                }
+            }
+
+            superstepBytesAggregator.setAggregatedValue(
+                new LongWritable(0L));
+            superstepMessagesAggregator.setAggregatedValue(
+                new LongWritable(0L));
+            workersAggregator.setAggregatedValue(
+                new LongWritable(1L));
+            useAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+            useAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+            useAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+            useAggregator(WORKERS);
+        }
 
-    /** Random generator for random bytes message */
-    private Random rnd = new Random(System.currentTimeMillis());
+        @Override
+        public void postSuperstep() {
+            LongSumAggregator superstepMillisAggregator =
+                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_MILLIS);
+            long endSuperstepMillis = System.currentTimeMillis();
+            long superstepMillis = endSuperstepMillis - startSuperstepMillis;
+            startSuperstepMillis = endSuperstepMillis;
+            superstepMillisAggregator.setAggregatedValue(
+                new LongWritable(superstepMillis));
+        }
 
-    @Override
-    public void compute(Iterator<BytesWritable> msgIterator) {
-        byte [] message = new byte[getConf().getInt(NUM_BYTES_PER_MESSAGE, 
16)];
-        int numMessage = getConf().getInt(NUM_MESSAGES_PER_VERTEX, 1);
-        if (getSuperstep() < getConf().getInt(SUPERSTEP_COUNT, -1)) {
-            for (int i=0; i < numMessage; i++) {
-                rnd.nextBytes(message);
-                sendMsgToAllEdges(new BytesWritable(message));
+        @Override
+        public void postApplication() {}
+
+        public byte[] getMessageBytes() {
+            return messageBytes;
+        }
+
+        public int getNumMessagePerEdge() {
+            return numMessagesPerEdge;
+        }
+
+        public int getNumSupersteps() {
+            return numSupersteps;
+        }
+
+        public void randomizeMessageBytes() {
+            random.nextBytes(messageBytes);
+        }
+    }
+
+    /**
+     * Actual message computation (messaging in this case)
+     */
+    public static class RandomMessageVertex extends EdgeListVertex<
+            LongWritable, DoubleWritable, DoubleWritable, BytesWritable> {
+
+        @Override
+        public void compute(Iterator<BytesWritable> msgIterator) {
+            RandomMessageBenchmarkWorkerContext workerContext =
+                (RandomMessageBenchmarkWorkerContext) getWorkerContext();
+            LongSumAggregator superstepBytesAggregator =
+                (LongSumAggregator) getAggregator(AGG_SUPERSTEP_TOTAL_BYTES);
+            LongSumAggregator superstepMessagesAggregator =
+                (LongSumAggregator) 
getAggregator(AGG_SUPERSTEP_TOTAL_MESSAGES);
+            if (getSuperstep() < workerContext.getNumSupersteps()) {
+                for (int i = 0; i < workerContext.getNumMessagePerEdge();
+                        i++) {
+                    workerContext.randomizeMessageBytes();
+                    sendMsgToAllEdges(
+                        new BytesWritable(workerContext.getMessageBytes()));
+                    long bytesSent = workerContext.getMessageBytes().length *
+                        getNumOutEdges();
+                    superstepBytesAggregator.aggregate(bytesSent);
+                    superstepMessagesAggregator.aggregate(getNumOutEdges());
+                }
+            } else {
+                voteToHalt();
             }
-        } else {
-            voteToHalt();
         }
     }
 
@@ -155,8 +356,9 @@ public class RandomMessageBenchmark exte
         int workers = Integer.parseInt(cmd.getOptionValue('w'));
         GiraphJob job = new GiraphJob(getConf(), getClass().getName());
         job.getConfiguration().setInt(GiraphJob.CHECKPOINT_FREQUENCY, 0);
-        job.setVertexClass(getClass());
+        job.setVertexClass(RandomMessageVertex.class);
         job.setVertexInputFormatClass(PseudoRandomVertexInputFormat.class);
+        job.setWorkerContextClass(RandomMessageBenchmarkWorkerContext.class);
         job.setWorkerConfiguration(workers, workers, 100.0f);
         job.getConfiguration().setLong(
             PseudoRandomVertexInputFormat.AGGREGATE_VERTICES,
@@ -171,7 +373,7 @@ public class RandomMessageBenchmark exte
             RandomMessageBenchmark.NUM_BYTES_PER_MESSAGE,
             Integer.parseInt(cmd.getOptionValue('b')));
         job.getConfiguration().setInt(
-            RandomMessageBenchmark.NUM_MESSAGES_PER_VERTEX,
+            RandomMessageBenchmark.NUM_MESSAGES_PER_EDGE,
             Integer.parseInt(cmd.getOptionValue('n')));
 
         boolean isVerbose = false;

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/comm/BasicRPCCommunications.java
 Wed Dec 14 19:06:10 2011
@@ -48,8 +48,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -57,6 +57,7 @@ import java.util.concurrent.Future;
 import org.apache.giraph.graph.WorkerInfo;
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.utils.MemoryUtils;
 
 /*if[HADOOP_FACEBOOK]
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -142,7 +143,7 @@ public abstract class BasicRPCCommunicat
      */
     private final Map<I, VertexMutations<I, V, E, M>>
         inVertexMutationsMap =
-            new TreeMap<I, VertexMutations<I, V, E, M>>();
+            new HashMap<I, VertexMutations<I, V, E, M>>();
 
     /** Maximum size of cached message list, before sending it out */
     private final int maxSize;
@@ -160,7 +161,7 @@ public abstract class BasicRPCCommunicat
     private class PeerConnection {
         /**
          * Map of outbound messages going to a particular remote server,
-         * mapping from vertex range (max vertex index) to list of messages.
+         * mapping from the destination vertex to a list of messages.
          * (Synchronized with itself).
          */
         private final Map<I, MsgList<M>> outMessagesPerPeer;
@@ -168,7 +169,6 @@ public abstract class BasicRPCCommunicat
          * Client interface: RPC proxy for remote server, this class for local
          */
         private final CommunicationsInterface<I, V, E, M> peer;
-        /** Maximum size of cached message list, before sending it out */
         /** Boolean, set to false when local client (self), true otherwise */
         private final boolean isProxy;
 
@@ -190,11 +190,18 @@ public abstract class BasicRPCCommunicat
         public CommunicationsInterface<I, V, E, M> getRPCProxy() {
             return peer;
         }
+
+        @Override
+        public String toString() {
+            return peer.getName() + ", proxy=" + isProxy;
+        }
     }
 
     private class PeerFlushExecutor implements Runnable {
         private final PeerConnection peerConnection;
         private final Mapper<?, ?, ?, ?>.Context context;
+        // Report on the status of this flusher if this interval was exceeded
+        private static final int REPORTING_INTERVAL_MIN_MILLIS = 60000;
 
         PeerFlushExecutor(PeerConnection peerConnection,
                           Mapper<?, ?, ?, ?>.Context context) {
@@ -206,11 +213,17 @@ public abstract class BasicRPCCommunicat
         public void run() {
             CommunicationsInterface<I, V, E, M> proxy
                 = peerConnection.getRPCProxy();
+            long startMillis = System.currentTimeMillis();
+            long lastReportedMillis = startMillis;
 
             try {
+                int verticesDone = 0;
                 synchronized (peerConnection.outMessagesPerPeer) {
-                    for (Entry<I, MsgList<M>> e :
-                        peerConnection.outMessagesPerPeer.entrySet()) {
+                    final int vertices =
+                        peerConnection.outMessagesPerPeer.size();
+                    while (!peerConnection.outMessagesPerPeer.isEmpty()) {
+                        Entry<I, MsgList<M>> e =
+                            
peerConnection.outMessagesPerPeer.entrySet().iterator().next();
                         MsgList<M> msgList = e.getValue();
 
                         if (msgList.size() > 0) {
@@ -222,27 +235,10 @@ public abstract class BasicRPCCommunicat
                                         proxy.putMsg(e.getKey(), combinedMsg);
                                     }
                                 } else {
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("putAllMessages: " +
-                                            proxy.getName() +
-                                            " putting (list) " + msgList +
-                                            " to " + e.getKey() +
-                                            ", proxy = " +
-                                            peerConnection.isProxy);
-                                    }
                                     proxy.putMsgList(e.getKey(), msgList);
                                 }
-                                msgList.clear();
                             } else {
                                 for (M msg : msgList) {
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("putAllMessages: "
-                                            + proxy.getName() +
-                                            " putting " + msg +
-                                            " to " + e.getKey() +
-                                            ", proxy = " +
-                                            peerConnection.isProxy);
-                                    }
                                     if (msg == null) {
                                         throw new IllegalArgumentException(
                                             "putAllMessages: Cannot put " +
@@ -251,7 +247,38 @@ public abstract class BasicRPCCommunicat
                                     proxy.putMsg(e.getKey(), msg);
                                     context.progress();
                                 }
-                                msgList.clear();
+                            }
+                            msgList.clear();
+                        }
+
+                        // Clean up the memory with the message list
+                        msgList = null;
+                        peerConnection.outMessagesPerPeer.remove(e.getKey());
+                        e = null;
+
+                        ++verticesDone;
+                        long curMillis = System.currentTimeMillis();
+                        if ((lastReportedMillis +
+                                REPORTING_INTERVAL_MIN_MILLIS) < curMillis) {
+                            lastReportedMillis = curMillis;
+                            if (LOG.isInfoEnabled()) {
+                                float percentDone =
+                                    (100f * verticesDone) /
+                                    vertices;
+                                float minutesUsed =
+                                    (curMillis - startMillis) / 1000f / 60f;
+                                float minutesRemaining =
+                                    (minutesUsed * 100f / percentDone) -
+                                    minutesUsed;
+                                LOG.info("run: " + peerConnection + ", " +
+                                         verticesDone + " out of " +
+                                         vertices  +
+                                         " done in " + minutesUsed +
+                                         " minutes, " +
+                                         percentDone + "% done, ETA " +
+                                         minutesRemaining +
+                                         " minutes remaining, " +
+                                         MemoryUtils.getRuntimeMemoryStats());
                             }
                         }
                     }
@@ -297,14 +324,14 @@ public abstract class BasicRPCCommunicat
                     peerConnection.getRPCProxy();
 
                 if (combiner != null) {
-                        M combinedMsg = combiner.combine(destVertex,
-                                                         outMessage);
-                        if (combinedMsg != null) {
-                            proxy.putMsg(destVertex, combinedMsg);
-                        }
-                    } else {
-                        proxy.putMsgList(destVertex, outMessage);
+                    M combinedMsg = combiner.combine(destVertex,
+                                                     outMessage);
+                    if (combinedMsg != null) {
+                        proxy.putMsg(destVertex, combinedMsg);
                     }
+                } else {
+                    proxy.putMsgList(destVertex, outMessage);
+                }
             } catch (IOException e) {
                 LOG.error(e);
                 if (peerConnection.isProxy) {
@@ -560,7 +587,7 @@ end[HADOOP_FACEBOOK]*/
         synchronized(transientInMessages) {
             msgs = transientInMessages.get(vertex);
             if (msgs == null) {
-                msgs = new ArrayList<M>();
+                msgs = new ArrayList<M>(msgList.size());
                 transientInMessages.put(vertex, msgs);
             }
         }
@@ -833,7 +860,9 @@ end[HADOOP_FACEBOOK]*/
     @Override
     public long flush(Mapper<?, ?, ?, ?>.Context context) throws IOException {
         if (LOG.isInfoEnabled()) {
-            LOG.info("flush: starting for superstep " + 
service.getSuperstep());
+            LOG.info("flush: starting for superstep " +
+                      service.getSuperstep() + " " +
+                      MemoryUtils.getRuntimeMemoryStats());
         }
         for (List<M> msgList : inMessages.values()) {
             msgList.clear();
@@ -856,11 +885,20 @@ end[HADOOP_FACEBOOK]*/
             try {
                 future.get();
                 context.progress();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException("flush: Got IOException", e);
+            } catch (ExecutionException e) {
+                throw new IllegalStateException(
+                    "flush: Got ExecutionException", e);
             }
         }
 
+        if (LOG.isInfoEnabled()) {
+            LOG.info("flush: ended for superstep " +
+                      service.getSuperstep() + " " +
+                      MemoryUtils.getRuntimeMemoryStats());
+        }
+
         long msgs = totalMsgsSentInSuperstep;
         totalMsgsSentInSuperstep = 0;
         return msgs;
@@ -869,7 +907,9 @@ end[HADOOP_FACEBOOK]*/
     @Override
     public void prepareSuperstep() {
         if (LOG.isInfoEnabled()) {
-            LOG.info("prepareSuperstep");
+            LOG.info("prepareSuperstep: Superstep " +
+                     service.getSuperstep() + " " +
+                     MemoryUtils.getRuntimeMemoryStats());
         }
         inPrepareSuperstep = true;
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/examples/LongSumAggregator.java
 Wed Dec 14 19:06:10 2011
@@ -26,27 +26,30 @@ import org.apache.giraph.graph.Aggregato
  * Aggregator for summing up values.
  */
 public class LongSumAggregator implements Aggregator<LongWritable> {
+    /** Internal sum */
+    private long sum = 0;
 
-  private long sum = 0;
-
-  public void aggregate(long value) {
-      sum += value;
-  }
-
-  public void aggregate(LongWritable value) {
-      sum += value.get();
-  }
-
-  public void setAggregatedValue(LongWritable value) {
-      sum = value.get();
-  }
-
-  public LongWritable getAggregatedValue() {
-      return new LongWritable(sum);
-  }
-
-  public LongWritable createAggregatedValue() {
-      return new LongWritable();
-  }
-
+    public void aggregate(long value) {
+        sum += value;
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        sum += value.get();
+    }
+
+    @Override
+    public void setAggregatedValue(LongWritable value) {
+        sum = value.get();
+    }
+
+    @Override
+    public LongWritable getAggregatedValue() {
+        return new LongWritable(sum);
+    }
+
+    @Override
+    public LongWritable createAggregatedValue() {
+        return new LongWritable();
+    }
 }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 Wed Dec 14 19:06:10 2011
@@ -29,6 +29,7 @@ import org.apache.giraph.graph.partition
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
 import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.zk.BspEvent;
 import org.apache.giraph.zk.PredicateLock;
@@ -471,9 +472,7 @@ public class BspServiceWorker<
                 String status = "readVerticesFromInputSplit: Loaded " +
                     totalVerticesLoaded + " vertices and " +
                     totalEdgesLoaded + " edges " +
-                    ", totalMem = " + Runtime.getRuntime().totalMemory() +
-                    " maxMem ="  + Runtime.getRuntime().maxMemory() +
-                    " freeMem=" + Runtime.getRuntime().freeMemory() + " " +
+                    MemoryUtils.getRuntimeMemoryStats() + " " +
                     getGraphMapper().getMapFunctions().toString() +
                     " - Attempt=" + getApplicationAttempt() +
                     ", Superstep=" + getSuperstep();
@@ -899,11 +898,13 @@ public class BspServiceWorker<
         //
         // Master will coordinate the barriers and aggregate "doneness" of all
         // the vertices.  Each worker will:
-        // 1. Save aggregator values that are in use.
-        // 2. Report the statistics (vertices, edges, messages, etc.)
-        // of this worker
-        // 3. Let the master know it is finished.
-        // 4. Then it waits for the master to say whether to stop or not.
+        // 1. Flush the unsent messages
+        // 2. Execute user postSuperstep() if necessary.
+        // 3. Save aggregator values that are in use.
+        // 4. Report the statistics (vertices, edges, messages, etc.)
+        //    of this worker
+        // 5. Let the master know it is finished.
+        // 6. Wait for the master's global stats, and check if done
         long workerSentMessages = 0;
         try {
             workerSentMessages = commService.flush(getContext());
@@ -911,6 +912,17 @@ public class BspServiceWorker<
             throw new IllegalStateException(
                 "finishSuperstep: flush failed", e);
         }
+
+        if (getSuperstep() != INPUT_SUPERSTEP) {
+            getWorkerContext().postSuperstep();
+            getContext().progress();
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
+                      MemoryUtils.getRuntimeMemoryStats());
+        }
+
         JSONArray aggregatorValueArray =
             marshalAggregatorValues(getSuperstep());
         Collection<PartitionStats> finalizedPartitionStats =

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java 
Wed Dec 14 19:06:10 2011
@@ -22,6 +22,7 @@ import org.apache.giraph.bsp.Centralized
 import org.apache.giraph.graph.partition.Partition;
 import org.apache.giraph.graph.partition.PartitionOwner;
 import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.zk.ZooKeeperManager;
 import org.apache.hadoop.conf.Configuration;
@@ -530,10 +531,7 @@ public class GraphMapper<I extends Writa
             }
 
             if (LOG.isDebugEnabled()) {
-                LOG.debug("map: totalMem=" +
-                          Runtime.getRuntime().totalMemory() +
-                          " maxMem=" + Runtime.getRuntime().maxMemory() +
-                          " freeMem=" + Runtime.getRuntime().freeMemory());
+                LOG.debug("map: " + MemoryUtils.getRuntimeMemoryStats());
             }
             context.progress();
 
@@ -576,6 +574,8 @@ public class GraphMapper<I extends Writa
                             basicVertex.getMsgList().iterator();
                         context.progress();
                         basicVertex.compute(vertexMsgIt);
+                        // Hint to GC to free the messages
+                        basicVertex.getMsgList().clear();
                     }
                     if (basicVertex.isHalted()) {
                         partitionStats.incrFinishedVertexCount();
@@ -585,15 +585,6 @@ public class GraphMapper<I extends Writa
                 }
                 partitionStatsList.add(partitionStats);
             }
-
-            serviceWorker.getWorkerContext().postSuperstep();
-            context.progress();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("map: totalMem="
-                         + Runtime.getRuntime().totalMemory() +
-                         " maxMem=" + Runtime.getRuntime().maxMemory() +
-                         " freeMem=" + Runtime.getRuntime().freeMemory());
-            }
         } while (!serviceWorker.finishSuperstep(partitionStatsList));
         if (LOG.isInfoEnabled()) {
             LOG.info("map: BSP application done " +

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java?rev=1214406&r1=1214405&r2=1214406&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/WorkerContext.java 
Wed Dec 14 19:06:10 2011
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.Mappe
 /**
  * WorkerContext allows for the execution of user code
  * on a per-worker basis. There's one WorkerContext per worker.
- *
  */
 @SuppressWarnings("rawtypes")
 public abstract class WorkerContext implements AggregatorUsage {
@@ -34,35 +33,35 @@ public abstract class WorkerContext impl
        public void setGraphState(GraphState graphState) {
                this.graphState = graphState;
        }
-       
+
     /**
      * Initialize the WorkerContext.
-     * This method is executed once on each Worker before the first 
+     * This method is executed once on each Worker before the first
      * superstep starts.
-     * 
-     * @throws IllegalAccessException 
-     * @throws InstantiationException 
+     *
+     * @throws IllegalAccessException
+     * @throws InstantiationException
      */
        public abstract void preApplication() throws InstantiationException,
                IllegalAccessException;
-       
+
     /**
      * Finalize the WorkerContext.
-     * This method is executed once on each Worker after the last 
+     * This method is executed once on each Worker after the last
      * superstep ends.
      */
     public abstract void postApplication();
 
     /**
      * Execute user code.
-     * This method is executed once on each Worker before each 
+     * This method is executed once on each Worker before each
      * superstep starts.
      */
     public abstract void preSuperstep();
-    
+
     /**
      * Execute user code.
-     * This method is executed once on each Worker after each 
+     * This method is executed once on each Worker after each
      * superstep ends.
      */
     public abstract void postSuperstep();
@@ -75,7 +74,7 @@ public abstract class WorkerContext impl
     public long getSuperstep() {
         return graphState.getSuperstep();
     }
-    
+
     /**
      * Get the total (all workers) number of vertices that
      * existed in the previous superstep.
@@ -85,7 +84,7 @@ public abstract class WorkerContext impl
     public long getNumVertices() {
        return graphState.getNumVertices();
     }
-    
+
     /**
      * Get the total (all workers) number of edges that
      * existed in the previous superstep.
@@ -95,7 +94,7 @@ public abstract class WorkerContext impl
     public long getNumEdges() {
        return graphState.getNumEdges();
     }
-    
+
     /**
      * Get the mapper context
      *
@@ -104,7 +103,7 @@ public abstract class WorkerContext impl
     public Mapper.Context getContext() {
         return graphState.getContext();
     }
-    
+
     @Override
     public final <A extends Writable> Aggregator<A> registerAggregator(
             String name,

Added: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java?rev=1214406&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java 
(added)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/utils/MemoryUtils.java 
Wed Dec 14 19:06:10 2011
@@ -0,0 +1,21 @@
+package org.apache.giraph.utils;
+
+/**
+ * Helper static methods for tracking memory usage.
+ */
+public class MemoryUtils {
+    /**
+     * Get stringified runtime memory stats
+     *
+     * @return String of all Runtime stats.
+     */
+    public static String getRuntimeMemoryStats() {
+        return "totalMem = " +
+               (Runtime.getRuntime().totalMemory() / 1024f / 1024f) +
+               "M, maxMem = "  +
+               (Runtime.getRuntime().maxMemory() / 1024f / 1024f) +
+               "M, freeMem = " +
+               (Runtime.getRuntime().freeMemory() / 1024f / 1024f)
+                + "M";
+    }
+}


Reply via email to