Author: edwardyoon
Date: Wed Apr 22 01:56:34 2015
New Revision: 1675244
URL: http://svn.apache.org/r1675244
Log:
minor code optimization
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1675244&r1=1675243&r2=1675244&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Wed Apr 22 01:56:34 2015
@@ -91,6 +91,10 @@ public final class GraphJobMessage imple
public MapWritable getMap() {
return map;
}
+
+ public void setVertexId(WritableComparable<?> vertexId) {
+ this.vertexId = vertexId;
+ }
public WritableComparable<?> getVertexId() {
return vertexId;
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675244&r1=1675243&r2=1675244&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed Apr 22 01:56:34 2015
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -117,7 +118,7 @@ public final class GraphJobRunner<V exte
private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
private RejectedExecutionHandler retryHandler = new
RetryRejectedExecutionHandler();
-
+
@Override
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -170,10 +171,6 @@ public final class GraphJobRunner<V exte
// loop over vertices and do their computation
doSuperstep(firstVertexMessage, peer);
-
- if (isMasterTask(peer)) {
- peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
- }
}
}
@@ -268,7 +265,10 @@ public final class GraphJobRunner<V exte
+ " looping: " + (System.currentTimeMillis() - loopStartTime) + " ms");
executor.shutdown();
- while (!executor.isTerminated()) {
+ try {
+ executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error(e);
}
for (V v : vertices.getNotComputedVertices()) {
@@ -307,16 +307,19 @@ public final class GraphJobRunner<V exte
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
executor.setRejectedExecutionHandler(retryHandler);
-
+
for (Vertex<V, E, M> v : vertices.getValues()) {
Runnable worker = new ComputeRunnable(v);
executor.execute(worker);
}
executor.shutdown();
- while (!executor.isTerminated()) {
+ try {
+ executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error(e);
}
-
+
getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
finishSuperstep();
@@ -411,8 +414,6 @@ public final class GraphJobRunner<V exte
EDGE_VALUE_CLASS = edgeValueClass;
}
- private Map<String, GraphJobMessage> messages = new HashMap<String,
GraphJobMessage>();
-
/**
* Loads vertices into memory of each peer.
*/
@@ -420,6 +421,8 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
+ final Map<String, GraphJobMessage> messages = new HashMap<String,
GraphJobMessage>();
+
VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
@@ -428,7 +431,7 @@ public final class GraphJobRunner<V exte
.newCachedThreadPool();
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
executor.setRejectedExecutionHandler(retryHandler);
-
+
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
@@ -462,8 +465,7 @@ public final class GraphJobRunner<V exte
peer.send(e.getKey(), e.getValue());
}
messages.clear();
- messages = null;
-
+
peer.sync();
GraphJobMessage msg;
@@ -480,8 +482,7 @@ public final class GraphJobRunner<V exte
}
}
executor.shutdown();
- while (!executor.isTerminated()) {
- }
+ executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
@@ -646,7 +647,8 @@ public final class GraphJobRunner<V exte
if (storage.containsKey(vertexID)) {
storage.get(vertexID).add(msg);
} else {
- storage.put(vertexID, new GraphJobMessage(vertexID, msg));
+ // To save bit memory we don't set vertexID twice
+ storage.put(vertexID, new GraphJobMessage(null, msg));
}
}
@@ -656,6 +658,8 @@ public final class GraphJobRunner<V exte
Iterator<Entry<V, GraphJobMessage>> it = storage.entrySet().iterator();
while (it.hasNext()) {
Entry<V, GraphJobMessage> e = it.next();
+ it.remove();
+
if (combiner != null && e.getValue().getNumOfValues() > 1) {
peer.send(
getHostName(e.getKey()),
@@ -663,9 +667,15 @@ public final class GraphJobRunner<V exte
.combine(getIterableMessages(e.getValue().getValuesBytes(), e
.getValue().getNumOfValues())))));
} else {
+ // set vertexID
+ e.getValue().setVertexId(e.getKey());
peer.send(getHostName(e.getKey()), e.getValue());
}
- it.remove();
+ }
+ storage.clear();
+
+ if (isMasterTask(peer)) {
+ peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
}
}
@@ -673,7 +683,7 @@ public final class GraphJobRunner<V exte
ByteArrayOutputStream a = new ByteArrayOutputStream();
DataOutputStream b = new DataOutputStream(a);
writable.write(b);
-
+ a.close();
return a.toByteArray();
}
@@ -732,7 +742,7 @@ public final class GraphJobRunner<V exte
* edge.
*/
public String getHostName(V vertexID) {
- return peer.getPeerName(getPartitioner().getPartition(vertexID, null,
+ return peer.getPeerName(partitioner.getPartition(vertexID, null,
peer.getNumPeers()));
}
@@ -758,13 +768,6 @@ public final class GraphJobRunner<V exte
}
/**
- * @return the defined partitioner instance.
- */
- public final Partitioner<V, M> getPartitioner() {
- return partitioner;
- }
-
- /**
* Gets the last aggregated value at the given index. The index is dependend
* on how the aggregators were configured during job setup phase.
*
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1675244&r1=1675243&r2=1675244&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Wed Apr 22
01:56:34 2015
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Counters.Counter;
-import org.apache.hama.bsp.Partitioner;
/**
* Vertex is a abstract definition of Google Pregel Vertex. For implementing a
@@ -76,14 +75,15 @@ public abstract class Vertex<V extends W
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
- runner.sendMessage(e.getDestinationVertexID(),
GraphJobRunner.serialize(msg));
+ runner.sendMessage(e.getDestinationVertexID(),
+ GraphJobRunner.serialize(msg));
}
@Override
public void sendMessage(V destinationVertexID, M msg) throws IOException {
runner.sendMessage(destinationVertexID, GraphJobRunner.serialize(msg));
}
-
+
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
final List<Edge<V, E>> outEdges = this.getEdges();
@@ -109,12 +109,8 @@ public abstract class Vertex<V extends W
vertex.setVertexID(vertexID);
msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
- // Find the proper partition to host the new vertex.
- int partition = getPartitioner().getPartition(vertexID, value,
- runner.getPeer().getNumPeers());
- String destPeer = runner.getPeer().getAllPeerNames()[partition];
-
- runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+ runner.getPeer().send(runner.getHostName(vertexID),
+ new GraphJobMessage(msg));
alterVertexCounter(1);
}
@@ -182,13 +178,6 @@ public abstract class Vertex<V extends W
return runner.getPeer();
}
- /**
- * @return the configured partitioner instance to message vertices.
- */
- public Partitioner<V, M> getPartitioner() {
- return runner.getPartitioner();
- }
-
@Override
public long getTotalNumVertices() {
return runner.getNumberVertices();
@@ -345,7 +334,7 @@ public abstract class Vertex<V extends W
protected GraphJobRunner<V, E, M> getRunner() {
return runner;
}
-
+
@Override
public void aggregate(int index, M value) throws IOException {
this.runner.getAggregationRunner().aggregateVertex(index, oldValue, value);
@@ -364,7 +353,7 @@ public abstract class Vertex<V extends W
public M getAggregatedValue(int index) {
return (M) runner.getLastAggregatedValue(index);
}
-
+
/**
* Get the number of aggregated vertices in the last superstep. Or null if no
* aggregator is available.You have to supply an index, the index is defined
@@ -381,7 +370,7 @@ public abstract class Vertex<V extends W
public Counter getCounter(Enum<?> name) {
return runner.getPeer().getCounter(name);
}
-
+
@Override
public Counter getCounter(String group, String name) {
return runner.getPeer().getCounter(group, name);