Author: edwardyoon
Date: Tue Apr 14 08:52:48 2015
New Revision: 1673392
URL: http://svn.apache.org/r1673392
Log: (empty)
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673392&r1=1673391&r2=1673392&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Apr 14 08:52:48 2015
@@ -237,18 +237,18 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = 0;
vertices.startSuperstep();
- ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
- "hama.graph.thread.num", 1000));
+ ExecutorService executor = Executors.newFixedThreadPool((peer
+ .getNumCurrentMessages() / conf.getInt(
+ "hama.graph.threadpool.percentage", 10)) + 1);
while (currentMessage != null) {
- Runnable worker = new ComputeRunnable(
- vertices.get((V) currentMessage.getVertexId()),
- (Iterable<M>) currentMessage.getIterableMessages());
+ Runnable worker = new ComputeRunnable(vertices.get((V) currentMessage
+ .getVertexId()), (Iterable<M>) currentMessage.getIterableMessages());
executor.execute(worker);
-
+
currentMessage = peer.getCurrentMessage();
}
-
+
executor.shutdown();
while (!executor.isTerminated()) {
}
@@ -278,17 +278,18 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = 0;
vertices.startSuperstep();
- ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
- "hama.graph.thread.num", 1000));
-
- for(Vertex<V, E, M> v : vertices.getValues()) {
- Runnable worker = new ComputeRunnable(v,
Collections.singleton(v.getValue()));
+ ExecutorService executor = Executors.newFixedThreadPool(vertices.size()
+ / conf.getInt("hama.graph.threadpool.percentage", 10));
+
+ for (Vertex<V, E, M> v : vertices.getValues()) {
+ Runnable worker = new ComputeRunnable(v, Collections.singleton(v
+ .getValue()));
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
-
+
vertices.finishSuperstep();
getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
@@ -306,11 +307,11 @@ public final class GraphJobRunner<V exte
@Override
public void run() {
try {
- // call once at initial superstep
- vertex.setup(conf);
-
- vertex.compute(msgs);
- vertices.finishVertexComputation(vertex);
+ // call once at initial superstep
+ vertex.setup(conf);
+
+ vertex.compute(msgs);
+ vertices.finishVertexComputation(vertex);
} catch (IOException e) {
e.printStackTrace();
}
@@ -379,9 +380,8 @@ public final class GraphJobRunner<V exte
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
- ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
- "hama.graph.thread.num", 1000));
-
+ ExecutorService executor = Executors.newFixedThreadPool(1000);
+
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
@@ -406,7 +406,7 @@ public final class GraphJobRunner<V exte
} catch (Exception e) {
e.printStackTrace();
}
-
+
peer.sync();
GraphJobMessage msg;
@@ -421,7 +421,7 @@ public final class GraphJobRunner<V exte
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
}
-
+
class LoadWorker implements Runnable {
Vertex<V, E, M> vertex;