Author: edwardyoon
Date: Mon Apr 13 07:52:45 2015
New Revision: 1673127
URL: http://svn.apache.org/r1673127
Log:
HAMA-526: Multi-threaded vertex processing
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Mon Apr 13
07:52:45 2015
@@ -18,6 +18,7 @@
package org.apache.hama.bsp;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
@@ -202,4 +203,9 @@ public interface BSPPeer<K1, V1, K2, V2,
* @return the task id of this task.
*/
public TaskAttemptID getTaskId();
+
+ public List<List<M>> getSubLists(int num);
+
+ public void clearIncomingMessages();
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Apr
13 07:52:45 2015
@@ -511,6 +511,11 @@ public final class BSPPeerImpl<K1, V1, K
public final void clear() {
messenger.clearOutgoingMessages();
}
+
+ @Override
+ public final void clearIncomingMessages() {
+ messenger.clearIncomingMessages();
+ }
/**
* @return the string as host:port of this Peer
@@ -668,4 +673,9 @@ public final class BSPPeerImpl<K1, V1, K
return taskId;
}
+ @Override
+ public List<List<M>> getSubLists(int num) {
+ return messenger.getSubLists(num);
+ }
+
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
Mon Apr 13 07:52:45 2015
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map.Entry;
import java.util.Queue;
@@ -121,6 +122,11 @@ public abstract class AbstractMessageMan
return localQueue.poll();
}
+ @Override
+ public List<List<M>> getSubLists(int num) {
+ return localQueue.getSubLists(num);
+ }
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#getNumCurrentMessages()
@@ -129,6 +135,10 @@ public abstract class AbstractMessageMan
public final int getNumCurrentMessages() {
return localQueue.size();
}
+
+ public void clearIncomingMessages() {
+ localQueue.clear();
+ }
/*
* (non-Javadoc)
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Mon Apr 13 07:52:45 2015
@@ -24,6 +24,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
Mon Apr 13 07:52:45 2015
@@ -20,6 +20,7 @@ package org.apache.hama.bsp.message;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
import org.apache.hadoop.io.Writable;
@@ -91,6 +92,11 @@ public interface MessageManager<M extend
public void clearOutgoingMessages();
/**
+ * Clears the incoming queue. Can be used to switch queues.
+ */
+ public void clearIncomingMessages();
+
+ /**
* Gets the number of messages in the current queue.
*
*/
@@ -122,4 +128,6 @@ public interface MessageManager<M extend
* on.
*/
public InetSocketAddress getListenerAddress();
+
+ public List<List<M>> getSubLists(int num);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Mon Apr 13 07:52:45 2015
@@ -17,7 +17,9 @@
*/
package org.apache.hama.bsp.message.queue;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
@@ -25,6 +27,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
+import com.google.common.collect.Lists;
+
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
@@ -87,11 +91,6 @@ public final class MemoryQueue<M extends
}
@Override
- public final Iterator<M> iterator() {
- return deque.iterator();
- }
-
- @Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@@ -117,4 +116,15 @@ public final class MemoryQueue<M extends
return this;
}
+ @Override
+ public List<List<M>> getSubLists(int num) {
+ List<List<M>> subLists = new ArrayList<List<M>>();
+ subLists.add(Lists.newArrayList(deque.iterator()));
+ Iterator<BSPMessageBundle<M>> it = bundles.iterator();
+ while (it.hasNext()) {
+ subLists.add(Lists.newArrayList(it.next().iterator()));
+ }
+ return subLists;
+ }
+
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
Mon Apr 13 07:52:45 2015
@@ -17,6 +17,8 @@
*/
package org.apache.hama.bsp.message.queue;
+import java.util.List;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -26,8 +28,7 @@ import org.apache.hama.bsp.TaskAttemptID
/**
* Simple queue interface.
*/
-public interface MessageQueue<M extends Writable> extends Iterable<M>,
- Configurable {
+public interface MessageQueue<M extends Writable> extends Configurable {
public static final String PERSISTENT_QUEUE =
"hama.queue.behaviour.persistent";
@@ -63,6 +64,8 @@ public interface MessageQueue<M extends
*/
public void add(M item);
+ public List<List<M>> getSubLists(int num);
+
/**
* Clears all entries in the given queue.
*/
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
Mon Apr 13 07:52:45 2015
@@ -17,7 +17,7 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.Iterator;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -45,17 +45,6 @@ public final class SingleLockQueue<T ext
/*
* (non-Javadoc)
- * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator()
- */
- @Override
- public Iterator<T> iterator() {
- synchronized (mutex) {
- return queue.iterator();
- }
- }
-
- /*
- * (non-Javadoc)
* @see
* org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop
* .conf.Configuration)
@@ -196,4 +185,9 @@ public final class SingleLockQueue<T ext
queue.addAll(otherqueue);
}
}
+
+ @Override
+ public List<List<T>> getSubLists(int num) {
+ return queue.getSubLists(num);
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
Mon Apr 13 07:52:45 2015
@@ -17,7 +17,7 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -26,6 +26,8 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
+import com.google.common.collect.Lists;
+
/**
* Heap (Java's priority queue) based message queue implementation that
supports
* sorted receive and send.
@@ -37,11 +39,6 @@ public final class SortedMemoryQueue<M e
private Configuration conf;
@Override
- public Iterator<M> iterator() {
- return queue.iterator();
- }
-
- @Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@@ -106,4 +103,9 @@ public final class SortedMemoryQueue<M e
return this;
}
+ @Override
+ public List<List<M>> getSubLists(int num) {
+ return Lists.partition(Lists.newArrayList(queue.iterator()), num);
+ }
+
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon
Apr 13 07:52:45 2015
@@ -151,6 +151,18 @@ public class TestCheckpoint extends Test
return null;
}
+ @Override
+ public List<List<Text>> getSubLists(int num) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void clearIncomingMessages() {
+ // TODO Auto-generated method stub
+
+ }
+
}
public static class TestBSPPeer implements
@@ -307,6 +319,18 @@ public class TestCheckpoint extends Test
return null;
}
+ @Override
+ public List<List<Text>> getSubLists(int num) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void clearIncomingMessages() {
+ // TODO Auto-generated method stub
+
+ }
+
}
public static class TempSyncClient extends BSPPeerSyncClient {
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java Mon
Apr 13 07:52:45 2015
@@ -173,6 +173,7 @@ public class MaxFlow {
stepStatusDetecting(haveActivingNormalVertex);
boolean pushStepCompleted = pushStepCompleted(haveActivingNormalVertex);
boolean senseStepCompleted =
senseStepCompleted(haveActivingNormalVertex);
+
if (senseStepCompleted) {
aggregate();
for (FloatArrayWritable msg : overFlowMsgList) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Mon Apr 13 07:52:45 2015
@@ -135,10 +135,10 @@ public final class GraphJobMessage imple
ByteArrayOutputStream a = new ByteArrayOutputStream();
DataOutputStream b = new DataOutputStream(a);
value.write(b);
+
byteBuffer.write(a.toByteArray());
numOfValues++;
} catch (IOException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Mon Apr 13 07:52:45 2015
@@ -20,11 +20,9 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -228,58 +226,65 @@ public final class GraphJobRunner<V exte
}
}
- private Set<V> notComputedVertices;
-
/**
* Do the main logic of a superstep, namely checking if vertices are active,
* feeding compute with messages and controlling combiners/aggregators. We
* iterate over our messages and vertices in sorted order. That means that we
* need to seek the first vertex that has the same ID as the iterated
message.
*/
- @SuppressWarnings("unchecked")
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- int activeVertices = 0;
this.changedVertexCnt = 0;
vertices.startSuperstep();
- notComputedVertices = new HashSet();
- notComputedVertices.addAll(vertices.keySet());
+ List<Thread> runners = new ArrayList<Thread>();
- Vertex<V, E, M> vertex = null;
+ List<List<GraphJobMessage>> subLists = peer.getSubLists(conf.getInt(
+ "hama.graph.thread.num", 100));
- while (currentMessage != null) {
- vertex = vertices.get((V) currentMessage.getVertexId());
+ if (subLists.size() == 0) {
+ if (currentMessage != null) {
+ List<GraphJobMessage> first = new ArrayList<GraphJobMessage>();
+ first.add(currentMessage);
+ runners.add(new ComputeReceivedMessage(first));
+ }
+ } else {
+ for (List<GraphJobMessage> subList : subLists) {
+ if (runners.size() == 0)
+ subList.add(currentMessage);
- // reactivation
- if (vertex.isHalted()) {
- vertex.setActive();
+ runners.add(new ComputeReceivedMessage(subList));
}
+ }
- if (!vertex.isHalted()) {
- vertex.compute((Iterable<M>) currentMessage.getIterableMessages());
- vertices.finishVertexComputation(vertex);
- activeVertices++;
+ for (Thread computer : runners) {
+ computer.start();
+ }
- notComputedVertices.remove(vertex.getVertexID());
+ for (Thread computer : runners) {
+ try {
+ computer.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
-
- currentMessage = peer.getCurrentMessage();
}
-
- for (V v : notComputedVertices) {
- vertex = vertices.get(v);
- if (!vertex.isHalted()) {
+
+ // After using getSubLists(), we need to clean up local vertex messages.
+ peer.clearIncomingMessages();
+
+ for (V v : vertices.getNotComputedVertices()) {
+ if (!vertices.get(v).isHalted()) {
+ Vertex<V, E, M> vertex = vertices.get(v);
vertex.compute(Collections.<M> emptyList());
vertices.finishVertexComputation(vertex);
- activeVertices++;
}
}
vertices.finishSuperstep();
- getAggregationRunner().sendAggregatorValues(peer, activeVertices,
- this.changedVertexCnt);
+
+ getAggregationRunner().sendAggregatorValues(peer,
+ vertices.getComputedVertices().size(), this.changedVertexCnt);
this.iteration++;
}
@@ -294,10 +299,10 @@ public final class GraphJobRunner<V exte
vertices.startSuperstep();
List<Thread> runners = new ArrayList<Thread>();
- List<Vertex<V, E, M>> v = new ArrayList<Vertex<V, E, M>>(
- vertices.getValues());
- for (List<Vertex<V, E, M>> partition : Lists.partition(v,
conf.getInt("hama.graph.thread.num", 30))) {
- runners.add(new Computer(partition));
+
+ for (List<V> vLists : Lists.partition(new ArrayList<V>(vertices.keySet()),
+ conf.getInt("hama.graph.thread.num", 100))) {
+ runners.add(new Computer(vLists));
}
for (Thread computer : runners) {
@@ -317,20 +322,49 @@ public final class GraphJobRunner<V exte
iteration++;
}
+ class ComputeReceivedMessage extends Thread {
+ List<GraphJobMessage> subList;
+
+ public ComputeReceivedMessage(List<GraphJobMessage> subList) {
+ this.subList = subList;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ try {
+ for (GraphJobMessage msg : subList) {
+ Vertex<V, E, M> vertex = vertices.get((V) msg.getVertexId());
+
+ // reactivation
+ if (vertex.isHalted()) {
+ vertex.setActive();
+ }
+
+ vertex.compute((Iterable<M>) msg.getIterableMessages());
+ vertices.finishVertexComputation(vertex);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
class Computer extends Thread {
- List<Vertex<V, E, M>> partition;
+ List<V> vList;
- public Computer(List<Vertex<V, E, M>> partition) {
- this.partition = partition;
+ public Computer(List<V> vList) {
+ this.vList = vList;
}
@Override
public void run() {
try {
- for (Vertex<V, E, M> v : partition) {
- v.setup(conf);
- v.compute(Collections.singleton(v.getValue()));
- vertices.finishVertexComputation(v);
+ for (V v : vList) {
+ Vertex<V, E, M> vertex = vertices.get(v);
+ vertex.setup(conf);
+ vertex.compute(Collections.singleton(vertex.getValue()));
+ vertices.finishVertexComputation(vertex);
}
} catch (IOException e) {
e.printStackTrace();
@@ -423,9 +457,9 @@ public final class GraphJobRunner<V exte
while ((received = peer.getCurrentMessage()) != null) {
addVertex((Vertex<V, E, M>) received.getVertex());
}
+
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
- LOG.debug("Starting Vertex processing!");
}
/**
@@ -571,9 +605,8 @@ public final class GraphJobRunner<V exte
vertices.finishSuperstep();
}
- public void sendMessage(V dstinationVertexID, M msg) throws IOException {
- peer.send(getHostName(dstinationVertexID), new GraphJobMessage(
- dstinationVertexID, msg));
+ public void sendMessage(V vertexID, M msg) throws IOException {
+ peer.send(getHostName(vertexID), new GraphJobMessage(vertexID, msg));
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Mon Apr 13 07:52:45 2015
@@ -17,7 +17,7 @@
*/
package org.apache.hama.graph;
-import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
@@ -36,11 +36,6 @@ public class IncomingVertexMessageManage
private final ConcurrentLinkedQueue<GraphJobMessage> mapMessages = new
ConcurrentLinkedQueue<GraphJobMessage>();
@Override
- public Iterator<GraphJobMessage> iterator() {
- return msgPerVertex.iterator();
- }
-
- @Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@@ -80,6 +75,7 @@ public class IncomingVertexMessageManage
@Override
public void clear() {
+ mapMessages.clear();
msgPerVertex.clear();
}
@@ -94,7 +90,7 @@ public class IncomingVertexMessageManage
@Override
public int size() {
- return msgPerVertex.size();
+ return msgPerVertex.size() + mapMessages.size();
}
// empty, not needed to implement
@@ -112,5 +108,9 @@ public class IncomingVertexMessageManage
return this;
}
+ @Override
+ public List<List<GraphJobMessage>> getSubLists(int num) {
+ return msgPerVertex.getSubLists(num);
+ }
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Mon Apr 13 07:52:45 2015
@@ -20,6 +20,7 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,8 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
+import com.google.common.collect.Sets;
+
/**
* Stores the vertices into a memory-based tree map. This implementation allows
* the runtime graph modification and random access by vertex ID.
@@ -43,6 +46,8 @@ public final class MapVerticesInfo<V ext
implements VerticesInfo<V, E, M> {
private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
M>>();
+ private Set<V> computedVertices;
+
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
@@ -113,9 +118,9 @@ public final class MapVerticesInfo<V ext
}
@Override
- public void finishVertexComputation(Vertex<V, E, M> vertex)
+ public synchronized void finishVertexComputation(Vertex<V, E, M> vertex)
throws IOException {
- // do nothing
+ computedVertices.add(vertex.getVertexID());
}
@Override
@@ -128,10 +133,23 @@ public final class MapVerticesInfo<V ext
@Override
public void startSuperstep() throws IOException {
+ computedVertices = new HashSet<V>();
}
@Override
public void finishSuperstep() throws IOException {
}
+ @Override
+ public Set<V> getComputedVertices() {
+ return this.computedVertices;
+ }
+
+ public Set<V> getNotComputedVertices() {
+ return Sets.difference(vertices.keySet(), computedVertices);
+ }
+
+ public int getActiveVerticesNum() {
+ return computedVertices.size();
+ }
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
Mon Apr 13 07:52:45 2015
@@ -18,11 +18,14 @@
package org.apache.hama.graph;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.collect.Lists;
+
public class MessagePerVertex {
@SuppressWarnings("rawtypes")
@@ -67,4 +70,8 @@ public class MessagePerVertex {
return (storage.size() > 0) ? storage.pollFirstEntry().getValue() : null;
}
+ public List<List<GraphJobMessage>> getSubLists(int num) {
+ return Lists.partition(Lists.newArrayList(iterator()), num);
+ }
+
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1673127&r1=1673126&r2=1673127&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Mon
Apr 13 07:52:45 2015
@@ -82,6 +82,12 @@ public interface VerticesInfo<V extends
public Collection<Vertex<V, E, M>> getValues();
+ public Set<V> getComputedVertices();
+
+ public Set<V> getNotComputedVertices();
+
+ public int getActiveVerticesNum();
+
/**
* Finish the additions, from this point on the implementations should close
* the adds and throw exceptions in case something is added after this call.