Author: edwardyoon
Date: Tue Apr 14 02:19:07 2015
New Revision: 1673339
URL: http://svn.apache.org/r1673339
Log:
Use thread pool
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 14
02:19:07 2015
@@ -204,8 +204,4 @@ public interface BSPPeer<K1, V1, K2, V2,
*/
public TaskAttemptID getTaskId();
- public List<List<M>> getSubLists(int num);
-
- public void clearIncomingMessages();
-
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue Apr
14 02:19:07 2015
@@ -512,11 +512,6 @@ public final class BSPPeerImpl<K1, V1, K
messenger.clearOutgoingMessages();
}
- @Override
- public final void clearIncomingMessages() {
- messenger.clearIncomingMessages();
- }
-
/**
* @return the string as host:port of this Peer
*/
@@ -673,9 +668,4 @@ public final class BSPPeerImpl<K1, V1, K
return taskId;
}
- @Override
- public List<List<M>> getSubLists(int num) {
- return messenger.getSubLists(num);
- }
-
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Tue Apr 14 02:19:07 2015
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map.Entry;
import java.util.Queue;
@@ -122,11 +121,6 @@ public abstract class AbstractMessageMan
return localQueue.poll();
}
- @Override
- public List<List<M>> getSubLists(int num) {
- return localQueue.getSubLists(num);
- }
-
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
Tue Apr 14 02:19:07 2015
@@ -36,9 +36,9 @@ import org.apache.hama.bsp.BSPMessageBun
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.AsyncRPC;
import org.apache.hama.ipc.AsyncServer;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.util.LRUCache;
/**
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Tue Apr 14 02:19:07 2015
@@ -24,8 +24,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
Tue Apr 14 02:19:07 2015
@@ -20,7 +20,6 @@ package org.apache.hama.bsp.message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
@@ -92,11 +91,6 @@ public interface MessageManager<M extend
public void clearOutgoingMessages();
/**
- * Clears the incoming queue. Can be used to switch queues.
- */
- public void clearIncomingMessages();
-
- /**
* Gets the number of messages in the current queue.
*
*/
@@ -129,5 +123,4 @@ public interface MessageManager<M extend
*/
public InetSocketAddress getListenerAddress();
- public List<List<M>> getSubLists(int num);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Tue Apr 14 02:19:07 2015
@@ -116,15 +116,4 @@ public final class MemoryQueue<M extends
return this;
}
- @Override
- public List<List<M>> getSubLists(int num) {
- List<List<M>> subLists = new ArrayList<List<M>>();
- subLists.add(Lists.newArrayList(deque.iterator()));
- Iterator<BSPMessageBundle<M>> it = bundles.iterator();
- while (it.hasNext()) {
- subLists.add(Lists.newArrayList(it.next().iterator()));
- }
- return subLists;
- }
-
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
Tue Apr 14 02:19:07 2015
@@ -17,8 +17,6 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.List;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -64,8 +62,6 @@ public interface MessageQueue<M extends
*/
public void add(M item);
- public List<List<M>> getSubLists(int num);
-
/**
* Clears all entries in the given queue.
*/
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
Tue Apr 14 02:19:07 2015
@@ -17,8 +17,6 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
@@ -186,8 +184,4 @@ public final class SingleLockQueue<T ext
}
}
- @Override
- public List<List<T>> getSubLists(int num) {
- return queue.getSubLists(num);
- }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Tue Apr 14 02:19:07 2015
@@ -17,7 +17,6 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -26,8 +25,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
-import com.google.common.collect.Lists;
-
/**
* Heap (Java's priority queue) based message queue implementation that
supports
* sorted receive and send.
@@ -103,9 +100,4 @@ public final class SortedMemoryQueue<M e
return this;
}
- @Override
- public List<List<M>> getSubLists(int num) {
- return Lists.partition(Lists.newArrayList(queue.iterator()), num);
- }
-
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue
Apr 14 02:19:07 2015
@@ -151,18 +151,6 @@ public class TestCheckpoint extends Test
return null;
}
- @Override
- public List<List<Text>> getSubLists(int num) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void clearIncomingMessages() {
- // TODO Auto-generated method stub
-
- }
-
}
public static class TestBSPPeer implements
@@ -319,18 +307,6 @@ public class TestCheckpoint extends Test
return null;
}
- @Override
- public List<List<Text>> getSubLists(int num) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void clearIncomingMessages() {
- // TODO Auto-generated method stub
-
- }
-
}
public static class TempSyncClient extends BSPPeerSyncClient {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Apr 14 02:19:07 2015
@@ -18,11 +18,11 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,8 +43,6 @@ import org.apache.hama.bsp.sync.SyncExce
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.util.ReflectionUtils;
-import com.google.common.collect.Lists;
-
/**
* Fully generic graph job runner.
*
@@ -232,47 +230,29 @@ public final class GraphJobRunner<V exte
* iterate over our messages and vertices in sorted order. That means that we
* need to seek the first vertex that has the same ID as the iterated
message.
*/
+ @SuppressWarnings("unchecked")
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
this.changedVertexCnt = 0;
vertices.startSuperstep();
- List<Thread> runners = new ArrayList<Thread>();
+ ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
+ "hama.graph.thread.num", 1000));
- List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
- "hama.graph.thread.num", 100));
-
- if (subLists.size() == 0) {
- if (currentMessage != null) {
- List<GraphJobMessage> first = new ArrayList<GraphJobMessage>();
- first.add(currentMessage);
- runners.add(new ComputeReceivedMessage(first));
- }
- } else {
- for (List<GraphJobMessage> subList : subLists) {
- if (runners.size() == 0)
- subList.add(currentMessage);
-
- runners.add(new ComputeReceivedMessage(subList));
- }
- }
-
- for (Thread computer : runners) {
- computer.start();
- }
-
- for (Thread computer : runners) {
- try {
- computer.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ while (currentMessage != null) {
+ Runnable worker = new ComputeRunnable(
+ vertices.get((V) currentMessage.getVertexId()),
+ (Iterable<M>) currentMessage.getIterableMessages());
+ executor.execute(worker);
+
+ currentMessage = peer.getCurrentMessage();
+ }
+
+ executor.shutdown();
+ while (!executor.isTerminated()) {
}
- // After using getSubLists(), we need to clean up local vertex messages.
- peer.clearIncomingMessages();
-
for (V v : vertices.getNotComputedVertices()) {
if (!vertices.get(v).isHalted()) {
Vertex<V, E, M> vertex = vertices.get(v);
@@ -298,74 +278,36 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = 0;
vertices.startSuperstep();
- List<Thread> runners = new ArrayList<Thread>();
-
- for (List<V> vLists : Lists.partition(new ArrayList<V>(vertices.keySet()),
- conf.getInt("hama.graph.thread.num", 100))) {
- runners.add(new Computer(vLists));
- }
-
- for (Thread computer : runners) {
- computer.start();
+ ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
+ "hama.graph.thread.num", 1000));
+
+ for(Vertex<V, E, M> v : vertices.getValues()) {
+ Runnable worker = new ComputeRunnable(v,
Collections.singleton(v.getValue()));
+ executor.execute(worker);
}
-
- for (Thread computer : runners) {
- try {
- computer.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
}
-
+
vertices.finishSuperstep();
getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
}
- class ComputeReceivedMessage extends Thread {
- List<GraphJobMessage> subList;
-
- public ComputeReceivedMessage(List<GraphJobMessage> subList) {
- this.subList = subList;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void run() {
- try {
- for (GraphJobMessage msg : subList) {
- Vertex<V, E, M> vertex = vertices.get((V) msg.getVertexId());
-
- // reactivation
- if (vertex.isHalted()) {
- vertex.setActive();
- }
-
- vertex.compute((Iterable<M>) msg.getIterableMessages());
- vertices.finishVertexComputation(vertex);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- class Computer extends Thread {
- List<V> vList;
-
- public Computer(List<V> vList) {
- this.vList = vList;
+ class ComputeRunnable implements Runnable {
+ Vertex<V, E, M> vertex;
+ Iterable<M> msgs;
+
+ public ComputeRunnable(Vertex<V, E, M> vertex, Iterable<M> msgs) {
+ this.vertex = vertex;
+ this.msgs = msgs;
}
@Override
public void run() {
try {
- for (V v : vList) {
- Vertex<V, E, M> vertex = vertices.get(v);
- vertex.setup(conf);
- vertex.compute(Collections.singleton(vertex.getValue()));
+ vertex.compute(msgs);
vertices.finishVertexComputation(vertex);
- }
} catch (IOException e) {
e.printStackTrace();
}
@@ -430,15 +372,17 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- final VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
+ VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
+ ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
+ "hama.graph.thread.num", 1000));
+
try {
KeyValuePair<Writable, Writable> next = null;
while ((next = peer.readNext()) != null) {
- // TODO read sequentially, and convert records using thread.
-
+
Vertex<V, E, M> vertex = GraphJobRunner
.<V, E, M> newVertexInstance(VERTEX_CLASS);
@@ -450,7 +394,8 @@ public final class GraphJobRunner<V exte
String dstHost = getHostName(vertex.getVertexID());
if (peer.getPeerName().equals(dstHost)) {
- addVertex(vertex);
+ Runnable worker = new LoadWorker(vertex);
+ executor.execute(worker);
} else {
peer.send(dstHost, new GraphJobMessage(vertex));
}
@@ -458,46 +403,33 @@ public final class GraphJobRunner<V exte
} catch (Exception e) {
e.printStackTrace();
}
+
peer.sync();
- List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
- "hama.graph.thread.num", 100));
- List<Thread> runners = new ArrayList<Thread>(subLists.size());
-
- for (List<GraphJobMessage> subList : subLists) {
- runners.add(new LoadReceivedMessage(subList));
- }
-
- for (Thread computer : runners) {
- computer.start();
+ GraphJobMessage msg;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ Runnable worker = new LoadWorker((Vertex<V, E, M>) msg.getVertex());
+ executor.execute(worker);
}
-
- for (Thread computer : runners) {
- try {
- computer.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
}
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
}
+
+ class LoadWorker implements Runnable {
+ Vertex<V, E, M> vertex;
- class LoadReceivedMessage extends Thread {
- List<GraphJobMessage> subList;
-
- public LoadReceivedMessage(List<GraphJobMessage> subList) {
- this.subList = subList;
+ public LoadWorker(Vertex<V, E, M> vertex) {
+ this.vertex = vertex;
}
- @SuppressWarnings("unchecked")
@Override
public void run() {
try {
- for (GraphJobMessage msg : subList) {
- addVertex((Vertex<V, E, M>) msg.getVertex());
- }
+ addVertex(vertex);
} catch (IOException e) {
e.printStackTrace();
}
@@ -516,6 +448,9 @@ public final class GraphJobRunner<V exte
vertex.setRunner(this);
vertices.put(vertex);
+
+ // call once
+ vertex.setup(conf);
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673339&r1=1673338&r2=1673339&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Tue Apr 14 02:19:07 2015
@@ -17,8 +17,6 @@
*/
package org.apache.hama.graph;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
@@ -28,8 +26,6 @@ import org.apache.hama.bsp.TaskAttemptID
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
-import com.google.common.collect.Lists;
-
public class IncomingVertexMessageManager<M extends WritableComparable<M>>
implements SynchronizedQueue<GraphJobMessage> {
@@ -111,12 +107,4 @@ public class IncomingVertexMessageManage
return this;
}
- @Override
- public List<List<GraphJobMessage>> getSubLists(int num) {
- if (mapMessages.size() > 0)
- return Lists.partition(new ArrayList<GraphJobMessage>(mapMessages), num);
- else
- return msgPerVertex.getSubLists(num);
- }
-
}