Author: tjungblut
Date: Tue Sep 11 09:47:46 2012
New Revision: 1383326
URL: http://svn.apache.org/viewvc?rev=1383326&view=rev
Log:
[HAMA-596]:Optimize memory usage of graph job
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Sep 11 09:47:46 2012
@@ -13,6 +13,7 @@ Release 0.6 (unreleased changes)
IMPROVEMENTS
+ HAMA-596: Optimize memory usage of graph job (tjungblut)
HAMA-599: Improvement of network-based runtime partitioner (edwardyoon)
Release 0.5 - April 10, 2012
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Sep
11 09:47:46 2012
@@ -344,10 +344,15 @@ public final class BSPPeerImpl<K1, V1, K
/**
* @return the size of assigned split
*/
+ @Override
public long getSplitSize() {
return splitSize;
}
+ /**
+ * @return the position in the input stream.
+ */
+ @Override
public long getPos() throws IOException {
return in.getPos();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue
Sep 11 09:47:46 2012
@@ -20,11 +20,10 @@ package org.apache.hama.bsp;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.LinkedList;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@@ -64,9 +63,6 @@ public class LocalBSPRunner implements J
private static String WORKING_DIR = "/tmp/hama-bsp/";
private volatile ThreadPoolExecutor threadPool;
- @SuppressWarnings("rawtypes")
- private static final LinkedList<Future<BSPPeerImpl>> FUTURE_LIST = new
LinkedList<Future<BSPPeerImpl>>();
-
private String jobFile;
private String jobName;
@@ -145,16 +141,19 @@ public class LocalBSPRunner implements J
}
threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(numBspTask);
+ @SuppressWarnings("rawtypes")
+ ExecutorCompletionService<BSPPeerImpl> completionService = new
ExecutorCompletionService<BSPPeerImpl>(
+ threadPool);
peerNames = new String[numBspTask];
for (int i = 0; i < numBspTask; i++) {
peerNames[i] = "local:" + i;
- FUTURE_LIST.add(threadPool.submit(new BSPRunner(new Configuration(conf),
- job, i, splits)));
+ completionService.submit(new BSPRunner(new Configuration(conf), job, i,
+ splits));
globalCounters.incrCounter(JobInProgress.JobCounter.LAUNCHED_TASKS, 1L);
}
- new Thread(new ThreadObserver(currentJobStatus)).start();
+ new Thread(new ThreadObserver(numBspTask, completionService)).start();
return currentJobStatus;
}
@@ -233,7 +232,6 @@ public class LocalBSPRunner implements J
}
- // deprecated until 0.5.0, then it will be removed.
@SuppressWarnings("unchecked")
public void run() throws Exception {
@@ -287,29 +285,34 @@ public class LocalBSPRunner implements J
}
// this thread observes the status of the runners.
+ @SuppressWarnings("rawtypes")
class ThreadObserver implements Runnable {
- final JobStatus status;
+ private final ExecutorCompletionService<BSPPeerImpl> completionService;
+ private final int numTasks;
+
+ public ThreadObserver(int numTasks,
- public ThreadObserver(JobStatus currentJobStatus) {
- this.status = currentJobStatus;
+ ExecutorCompletionService<BSPPeerImpl> completionService) {
+ this.numTasks = numTasks;
+ this.completionService = completionService;
}
- @SuppressWarnings("rawtypes")
@Override
public void run() {
boolean success = true;
- for (Future<BSPPeerImpl> future : FUTURE_LIST) {
+
+ for (int i = 0; i < numTasks; i++) {
try {
- BSPPeerImpl bspPeerImpl = future.get();
- currentJobStatus.getCounter().incrAllCounters(
- bspPeerImpl.getCounters());
- } catch (InterruptedException e) {
- LOG.error("Exception during BSP execution!", e);
- success = false;
- } catch (ExecutionException e) {
+ Future<BSPPeerImpl> take = completionService.take();
+ if (take != null) {
+ currentJobStatus.getCounter().incrAllCounters(
+ take.get().getCounters());
+ }
+ } catch (Exception e) {
LOG.error("Exception during BSP execution!", e);
success = false;
+ break;
}
}
if (success) {
@@ -321,7 +324,6 @@ public class LocalBSPRunner implements J
}
threadPool.shutdownNow();
}
-
}
public static class LocalMessageManager<M extends Writable> extends
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
Tue Sep 11 09:47:46 2012
@@ -17,10 +17,10 @@
*/
package org.apache.hama.bsp.message;
+import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
-import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
*/
public final class MemoryQueue<M extends Writable> implements MessageQueue<M> {
- private final Deque<M> deque = new LinkedList<M>();
+ private final Deque<M> deque = new ArrayDeque<M>();
private Configuration conf;
@Override
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Tue Sep 11
09:47:46 2012
@@ -27,7 +27,6 @@ public final class Edge<VERTEX_ID extend
private final VERTEX_ID destinationVertexID;
private final EDGE_VALUE_TYPE cost;
- String destinationPeerName;
public Edge(VERTEX_ID sourceVertexID, EDGE_VALUE_TYPE cost) {
this.destinationVertexID = sourceVertexID;
@@ -38,32 +37,16 @@ public final class Edge<VERTEX_ID extend
}
}
- public Edge(VERTEX_ID sourceVertexID, String destinationPeer,
- EDGE_VALUE_TYPE cost) {
- this.destinationVertexID = sourceVertexID;
- destinationPeerName = destinationPeer;
- if (cost instanceof NullWritable) {
- this.cost = null;
- } else {
- this.cost = cost;
- }
- }
-
public VERTEX_ID getDestinationVertexID() {
return destinationVertexID;
}
- public String getDestinationPeerName() {
- return destinationPeerName;
- }
-
public EDGE_VALUE_TYPE getValue() {
return cost;
}
@Override
public String toString() {
- return this.destinationVertexID + ":" + this.getValue() + " (resides on "
- + destinationPeerName + ")";
+ return this.destinationVertexID + ":" + this.getValue();
}
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Tue Sep 11 09:47:46 2012
@@ -107,7 +107,6 @@ public final class GraphJobMessage imple
out.writeInt(outEdges.size());
for (Object e : outEdges) {
Edge<?, ?> edge = (Edge<?, ?>) e;
- out.writeUTF(edge.getDestinationPeerName());
edge.getDestinationVertexID().write(out);
if (edge.getValue() != null) {
out.writeBoolean(true);
@@ -136,7 +135,7 @@ public final class GraphJobMessage imple
map = new MapWritable();
map.readFields(in);
} else if (isPartitioningMessage()) {
- Vertex<Writable, Writable, Writable> vertex = GraphJobRunner
+ Vertex<Writable, Writable, Writable> vertex = GraphJobRunnerBase
.newVertexInstance(VERTEX_CLASS, null);
Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
vertexId.readFields(in);
@@ -150,7 +149,6 @@ public final class GraphJobMessage imple
int size = in.readInt();
vertex.setEdges(new ArrayList<Edge<Writable, Writable>>(size));
for (int i = 0; i < size; i++) {
- String destination = in.readUTF();
Writable edgeVertexID = ReflectionUtils.newInstance(VERTEX_ID_CLASS,
null);
edgeVertexID.readFields(in);
@@ -160,7 +158,7 @@ public final class GraphJobMessage imple
edgeValue.readFields(in);
}
vertex.getEdges().add(
- new Edge<Writable, Writable>(edgeVertexID, destination,
edgeValue));
+ new Edge<Writable, Writable>(edgeVertexID, edgeValue));
}
this.vertex = vertex;
} else if (isVerticesSizeMessage()) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Sep 11 09:47:46 2012
@@ -48,6 +48,7 @@ public final class GraphJobRunner<V exte
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
+ this.peer = peer;
this.conf = peer.getConfiguration();
// Choose one as a master to collect global updates
this.masterTask = peer.getPeerName(0);
@@ -116,8 +117,9 @@ public final class GraphJobRunner<V exte
.newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
VertexInputReader.class), conf);
- loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader,
this);
-
+ loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader,
+ this);
+
for (String peerName : peer.getAllPeerNames()) {
peer.send(peerName, new GraphJobMessage(new
IntWritable(vertices.size())));
}
@@ -130,7 +132,7 @@ public final class GraphJobRunner<V exte
numberVertices += msg.getVerticesSize().get();
}
}
-
+
// TODO refactor this to a single step
for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
LinkedList<M> msgIterator = new LinkedList<M>();
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
Tue Sep 11 09:47:46 2012
@@ -89,6 +89,8 @@ public abstract class GraphJobRunnerBase
protected Class<E> edgeValueClass;
protected Class<Vertex<V, E, M>> vertexClass;
+ protected BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
+
@SuppressWarnings("unchecked")
protected void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
@@ -107,7 +109,6 @@ public abstract class GraphJobRunnerBase
LOG.debug("vertex class: " + vertexClass);
boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
- vertex.setPeer(peer);
vertex.runner = graphJobRunner;
long startPos = peer.getPos();
@@ -127,27 +128,17 @@ public abstract class GraphJobRunnerBase
vertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
if (selfReference) {
- vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), peer.getPeerName(),
- null));
+ vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
if (runtimePartitioning) {
int partition = partitioner.getPartition(vertex.getVertexID(),
vertex.getValue(), peer.getNumPeers());
- // set the destination name for the edge now
- for (Edge<V, E> edge : vertex.getEdges()) {
- int edgePartition = partitioner.getPartition(
- edge.getDestinationVertexID(), (M) edge.getValue(),
- peer.getNumPeers());
- edge.destinationPeerName = peer.getPeerName(edgePartition);
- }
peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
} else {
- // FIXME need to set destination names
vertex.setup(conf);
vertices.put(vertex.getVertexID(), vertex);
}
vertex = newVertexInstance(vertexClass, conf);
- vertex.setPeer(peer);
vertex.runner = graphJobRunner;
if (runtimePartitioning) {
@@ -157,7 +148,6 @@ public abstract class GraphJobRunnerBase
GraphJobMessage msg = null;
while ((msg = peer.getCurrentMessage()) != null) {
Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
- messagedVertex.setPeer(peer);
messagedVertex.runner = graphJobRunner;
messagedVertex.setup(conf);
vertices.put(messagedVertex.getVertexID(), messagedVertex);
@@ -173,7 +163,6 @@ public abstract class GraphJobRunnerBase
GraphJobMessage msg = null;
while ((msg = peer.getCurrentMessage()) != null) {
Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
- messagedVertex.setPeer(peer);
messagedVertex.runner = graphJobRunner;
messagedVertex.setup(conf);
vertices.put(messagedVertex.getVertexID(), messagedVertex);
@@ -238,8 +227,9 @@ public abstract class GraphJobRunnerBase
int i = 0;
int syncs = 0;
for (V v : keys) {
+ Vertex<V, E, M> vertex2 = vertices.get(v);
for (Edge<V, E> e : vertices.get(v).getEdges()) {
- peer.send(e.getDestinationPeerName(),
+ peer.send(vertex2.getDestinationPeerName(e),
new GraphJobMessage(e.getDestinationVertexID()));
}
@@ -251,16 +241,11 @@ public abstract class GraphJobRunnerBase
V vertexName = (V) msg.getVertexId();
if (!vertices.containsKey(vertexName)) {
Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
- newVertex.setPeer(peer);
newVertex.setVertexID(vertexName);
newVertex.runner = graphJobRunner;
if (selfReference) {
- int partition = partitioner.getPartition(
- newVertex.getVertexID(), newVertex.getValue(),
- peer.getNumPeers());
- String target = peer.getPeerName(partition);
newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
- newVertex.getVertexID(), target, null)));
+ newVertex.getVertexID(), null)));
} else {
newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
@@ -278,15 +263,11 @@ public abstract class GraphJobRunnerBase
V vertexName = (V) msg.getVertexId();
if (!vertices.containsKey(vertexName)) {
Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
- newVertex.setPeer(peer);
newVertex.setVertexID(vertexName);
newVertex.runner = graphJobRunner;
if (selfReference) {
- int partition = partitioner.getPartition(newVertex.getVertexID(),
- newVertex.getValue(), peer.getNumPeers());
- String target = peer.getPeerName(partition);
newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
- newVertex.getVertexID(), target, null)));
+ newVertex.getVertexID(), null)));
} else {
newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
@@ -477,4 +458,8 @@ public abstract class GraphJobRunnerBase
return globalAggregatorIncrement[index];
}
+ public BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
getPeer() {
+ return peer;
+ }
+
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1383326&r1=1383325&r2=1383326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Tue Sep 11
09:47:46 2012
@@ -30,16 +30,16 @@ import org.apache.hama.bsp.Partitioner;
public abstract class Vertex<V extends Writable, E extends Writable, M extends
Writable>
implements VertexInterface<V, E, M> {
+ GraphJobRunner<?, ?, ?> runner;
+
private V vertexID;
private M value;
- protected GraphJobRunner<V, E, M> runner;
- private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
private List<Edge<V, E>> edges;
private boolean votedToHalt = false;
public Configuration getConf() {
- return peer.getConfiguration();
+ return runner.getPeer().getConfiguration();
}
@Override
@@ -53,10 +53,28 @@ public abstract class Vertex<V extends W
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
- peer.send(e.getDestinationPeerName(),
+ runner.getPeer().send(getDestinationPeerName(e),
new GraphJobMessage(e.getDestinationVertexID(), msg));
}
+ /**
+ * @return the destination peer name of the destination of the given directed
+ * edge.
+ */
+ public String getDestinationPeerName(Edge<V, E> edge) {
+ return getDestinationPeerName(edge.getDestinationVertexID());
+ }
+
+ /**
+ * @return the destination peer name of the given vertex id, determined by
the
+ * partitioner.
+ */
+ public String getDestinationPeerName(V vertexId) {
+ return runner.getPeer().getPeerName(
+ getPartitioner().getPartition(vertexId, value,
+ runner.getPeer().getNumPeers()));
+ }
+
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
final List<Edge<V, E>> outEdges = this.getEdges();
@@ -68,9 +86,10 @@ public abstract class Vertex<V extends W
@Override
public void sendMessage(V destinationVertexID, M msg) throws IOException {
int partition = getPartitioner().getPartition(destinationVertexID, msg,
- peer.getNumPeers());
- String destPeer = peer.getAllPeerNames()[partition];
- peer.send(destPeer, new GraphJobMessage(destinationVertexID, msg));
+ runner.getPeer().getNumPeers());
+ String destPeer = runner.getPeer().getAllPeerNames()[partition];
+ runner.getPeer().send(destPeer,
+ new GraphJobMessage(destinationVertexID, msg));
}
@Override
@@ -84,7 +103,7 @@ public abstract class Vertex<V extends W
public void addEdge(Edge<V, E> edge) {
if (edges == null) {
- this.edges = new ArrayList<Edge<V, E>>();
+ this.edges = new ArrayList<Edge<V, E>>(1);
}
this.edges.add(edge);
}
@@ -138,23 +157,22 @@ public abstract class Vertex<V extends W
}
public int getNumPeers() {
- return peer.getNumPeers();
+ return runner.getPeer().getNumPeers();
}
/**
* Gives access to the BSP primitives and additional features by a peer.
*/
public BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
getPeer() {
- return peer;
+ return runner.getPeer();
}
+ /**
+ * @return the configured partitioner instance to message vertices.
+ */
+ @SuppressWarnings("unchecked")
public Partitioner<V, M> getPartitioner() {
- return runner.getPartitioner();
- }
-
- void setPeer(
- BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
- this.peer = peer;
+ return (Partitioner<V, M>) runner.getPartitioner();
}
@Override