Author: edwardyoon
Date: Mon Apr 20 10:18:36 2015
New Revision: 1674779
URL: http://svn.apache.org/r1674779
Log:
HAMA-946: Refactor graph package
Removed:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
Modified:
hama/trunk/core/pom.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
hama/trunk/mesos/pom.xml
hama/trunk/pom.xml
Modified: hama/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Mon Apr 20 10:18:36 2015
@@ -64,6 +64,10 @@
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
Mon Apr 20 10:18:36 2015
@@ -17,9 +17,7 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.ArrayList;
import java.util.Iterator;
-import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
@@ -27,8 +25,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
-import com.google.common.collect.Lists;
-
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Mon Apr 20 10:18:36 2015
@@ -17,20 +17,14 @@
*/
package org.apache.hama.graph;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.util.ReflectionUtils;
@@ -76,20 +70,24 @@ public final class GraphJobMessage imple
this.map = map;
}
- public GraphJobMessage(WritableComparable<?> vertexId, Writable vertexValue)
{
+ public GraphJobMessage(WritableComparable<?> vertexId, byte[] vertexValue) {
this.flag = VERTEX_FLAG;
this.vertexId = vertexId;
add(vertexValue);
}
- public GraphJobMessage(WritableComparable<?> vertexId, List<Writable>
values) {
- this.flag = VERTEX_FLAG;
- this.vertexId = vertexId;
-
- addAll(values);
+ public GraphJobMessage(IntWritable size) {
+ this.flag = VERTICES_SIZE_FLAG;
+ this.integerMessage = size;
}
+ public GraphJobMessage(byte[] vertex) {
+ this.flag = PARTITION_FLAG;
+
+ add(vertex);
+ }
+
public MapWritable getMap() {
return map;
}
@@ -112,49 +110,19 @@ public final class GraphJobMessage imple
}
}
- public void add(Writable value) {
+ public void add(byte[] value) {
try {
- ByteArrayOutputStream a = new ByteArrayOutputStream();
- DataOutputStream b = new DataOutputStream(a);
- value.write(b);
-
- byteBuffer.write(a.toByteArray());
+ byteBuffer.write(value);
numOfValues++;
} catch (IOException e) {
e.printStackTrace();
}
}
- public void addAll(List<Writable> values) {
- ByteArrayOutputStream a = new ByteArrayOutputStream();
- DataOutputStream b = new DataOutputStream(a);
- try {
- for (Writable v : values) {
- v.write(b);
- }
-
- byteBuffer.write(a.toByteArray());
- numOfValues += values.size();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
public int getNumOfValues() {
return this.numOfValues;
}
- public GraphJobMessage(IntWritable size) {
- this.flag = VERTICES_SIZE_FLAG;
- this.integerMessage = size;
- }
-
- public GraphJobMessage(Vertex<?, ?, ?> vertex) {
- this.flag = PARTITION_FLAG;
-
- add(vertex);
- }
-
@Override
public void write(DataOutput out) throws IOException {
out.writeByte(this.flag);
@@ -327,40 +295,4 @@ public final class GraphJobMessage imple
}
}
- public Iterable<Writable> getIterableMessages() {
-
- return new Iterable<Writable>() {
- @Override
- public Iterator<Writable> iterator() {
- return new Iterator<Writable>() {
- ByteArrayInputStream bis = new ByteArrayInputStream(
- byteBuffer.toByteArray());
- DataInputStream dis = new DataInputStream(bis);
- int index = 0;
-
- @Override
- public boolean hasNext() {
- return (index < numOfValues) ? true : false;
- }
-
- @Override
- public Writable next() {
- Writable v = GraphJobRunner.createVertexValue();
- try {
- v.readFields(dis);
- } catch (IOException e) {
- e.printStackTrace();
- }
- index++;
- return v;
- }
-
- @Override
- public void remove() {
- }
- };
- }
- };
- }
-
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Mon Apr 20 10:18:36 2015
@@ -18,13 +18,17 @@
package org.apache.hama.graph;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -41,6 +45,7 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
@@ -106,6 +111,7 @@ public final class GraphJobRunner<V exte
private AggregationRunner<V, E, M> aggregationRunner;
private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
+ private Combiner<Writable> combiner;
private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
@@ -160,10 +166,7 @@ public final class GraphJobRunner<V exte
}
// loop over vertices and do their computation
- startTime = System.currentTimeMillis();
doSuperstep(firstVertexMessage, peer);
- LOG.info("Total time spent for " + peer.getSuperstepCount()
- + " superstep: " + (System.currentTimeMillis() - startTime) + " ms");
if (isMasterTask(peer)) {
peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -234,22 +237,28 @@ public final class GraphJobRunner<V exte
* iterate over our messages and vertices in sorted order. That means that we
* need to seek the first vertex that has the same ID as the iterated
message.
*/
- @SuppressWarnings("unchecked")
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
+ long startTime = System.currentTimeMillis();
+
this.changedVertexCnt = 0;
vertices.startSuperstep();
- ExecutorService executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors.newFixedThreadPool((peer
+ .getNumCurrentMessages() / conf.getInt(
+ "hama.graph.threadpool.percentage", 20)) + 1);
+ long loopStartTime = System.currentTimeMillis();
while (currentMessage != null) {
- Runnable worker = new ComputeRunnable(vertices.get((V) currentMessage
- .getVertexId()), (Iterable<M>) currentMessage.getIterableMessages());
+ Runnable worker = new ComputeRunnable(currentMessage);
executor.execute(worker);
currentMessage = peer.getCurrentMessage();
}
+ LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
+ + " looping: " + (System.currentTimeMillis() -
loopStartTime)
+ + " ms");
executor.shutdown();
while (!executor.isTerminated()) {
@@ -263,11 +272,18 @@ public final class GraphJobRunner<V exte
}
}
- vertices.finishSuperstep();
-
getAggregationRunner().sendAggregatorValues(peer,
vertices.getComputedVertices().size(), this.changedVertexCnt);
this.iteration++;
+
+ LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
+ + " computing vertices: " + (System.currentTimeMillis() - startTime)
+ + " ms");
+
+ startTime = System.currentTimeMillis();
+ finishSuperstep();
+ LOG.info("Total time spent for superstep-" + peer.getSuperstepCount()
+ + " synchronizing: " + (System.currentTimeMillis() - startTime) + "
ms");
}
/**
@@ -280,28 +296,36 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = 0;
vertices.startSuperstep();
- ExecutorService executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors
+ .newFixedThreadPool((vertices.size() / conf.getInt(
+ "hama.graph.threadpool.percentage", 20)) + 1);
for (Vertex<V, E, M> v : vertices.getValues()) {
- Runnable worker = new ComputeRunnable(v, null);
+ Runnable worker = new ComputeRunnable(v);
executor.execute(worker);
}
+
executor.shutdown();
while (!executor.isTerminated()) {
}
- vertices.finishSuperstep();
getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
+ finishSuperstep();
}
class ComputeRunnable implements Runnable {
Vertex<V, E, M> vertex;
Iterable<M> msgs;
- public ComputeRunnable(Vertex<V, E, M> vertex, Iterable<M> msgs) {
- this.vertex = vertex;
- this.msgs = msgs;
+ @SuppressWarnings("unchecked")
+ public ComputeRunnable(GraphJobMessage msg) {
+ this.vertex = vertices.get((V) msg.getVertexId());
+ this.msgs = (Iterable<M>) getIterableMessages(msg.getValuesBytes(),
msg.getNumOfValues());
+ }
+
+ public ComputeRunnable(Vertex<V, E, M> v) {
+ this.vertex = v;
}
@Override
@@ -350,6 +374,16 @@ public final class GraphJobRunner<V exte
VerticesInfo.class);
vertices = ReflectionUtils.newInstance(verticesInfoClass);
vertices.init(this, conf, peer.getTaskId());
+
+ final String combinerName = conf.get(Constants.COMBINER_CLASS);
+ if (combinerName != null) {
+ try {
+ combiner = (Combiner<Writable>) ReflectionUtils
+ .newInstance(combinerName);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
}
@SuppressWarnings("unchecked")
@@ -381,9 +415,6 @@ public final class GraphJobRunner<V exte
private void loadVertices(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- // kryo.register(GraphJobRunner
- // .<V, E, M> newVertexInstance(VERTEX_CLASS).getClass());
-
VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
VertexInputReader.class));
@@ -409,9 +440,9 @@ public final class GraphJobRunner<V exte
executor.execute(worker);
} else {
if (!messages.containsKey(dstHost)) {
- messages.put(dstHost, new GraphJobMessage(vertex));
+ messages.put(dstHost, new GraphJobMessage(serialize(vertex)));
} else {
- messages.get(dstHost).add(vertex);
+ messages.get(dstHost).add(serialize(vertex));
}
}
}
@@ -424,7 +455,7 @@ public final class GraphJobRunner<V exte
}
messages.clear();
messages = null;
-
+
peer.sync();
GraphJobMessage msg;
@@ -596,18 +627,87 @@ public final class GraphJobRunner<V exte
private void finishRemovals() throws IOException {
vertices.finishRemovals();
- // finish the "superstep" because we have written a new file here
- vertices.finishSuperstep();
}
private void finishAdditions() throws IOException {
vertices.finishAdditions();
- // finish the "superstep" because we have written a new file here
+ }
+
+ private final ConcurrentNavigableMap<V, GraphJobMessage> storage = new
ConcurrentSkipListMap<V, GraphJobMessage>();
+
+ public void sendMessage(V vertexID, byte[] msg) throws IOException {
+ if (storage.containsKey(vertexID)) {
+ storage.get(vertexID).add(msg);
+ } else {
+ storage.put(vertexID, new GraphJobMessage(vertexID, msg));
+ }
+ }
+
+ public void finishSuperstep() throws IOException {
vertices.finishSuperstep();
+
+ for (Map.Entry<V, GraphJobMessage> m : storage.entrySet()) {
+ // Combining messages
+ if (combiner != null) {
+ if (m.getValue().getNumOfValues() > 1) {
+ peer.send(
+ getHostName(m.getKey()),
+ new GraphJobMessage(m.getKey(), serialize(combiner
+ .combine(getIterableMessages(m.getValue().getValuesBytes(), m
+ .getValue().getNumOfValues())))));
+ } else {
+ peer.send(getHostName(m.getKey()), m.getValue());
+ }
+ } else {
+ peer.send(getHostName(m.getKey()), m.getValue());
+ }
+ }
+
+ storage.clear();
}
- public void sendMessage(V vertexID, M msg) throws IOException {
- peer.send(getHostName(vertexID), new GraphJobMessage(vertexID, msg));
+ public static byte[] serialize(Writable writable) throws IOException {
+ ByteArrayOutputStream a = new ByteArrayOutputStream();
+ DataOutputStream b = new DataOutputStream(a);
+ writable.write(b);
+
+ return a.toByteArray();
+ }
+
+ public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
+ final int numOfValues) {
+
+ return new Iterable<Writable>() {
+ @Override
+ public Iterator<Writable> iterator() {
+ return new Iterator<Writable>() {
+ ByteArrayInputStream bis = new ByteArrayInputStream(valuesBytes);
+ DataInputStream dis = new DataInputStream(bis);
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return (index < numOfValues) ? true : false;
+ }
+
+ @Override
+ public Writable next() {
+ Writable v = createVertexValue();
+ try {
+ v.readFields(dis);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ index++;
+ return v;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Mon Apr 20 10:18:36 2015
@@ -17,6 +17,8 @@
*/
package org.apache.hama.graph;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
@@ -31,7 +33,8 @@ public class IncomingVertexMessageManage
private Configuration conf;
- private final MessagePerVertex msgPerVertex = new MessagePerVertex();
+ @SuppressWarnings("rawtypes")
+ private final ConcurrentHashMap<WritableComparable, GraphJobMessage> storage
= new ConcurrentHashMap<WritableComparable, GraphJobMessage>();
private final ConcurrentLinkedQueue<GraphJobMessage> mapMessages = new
ConcurrentLinkedQueue<GraphJobMessage>();
@Override
@@ -66,7 +69,11 @@ public class IncomingVertexMessageManage
@Override
public void add(GraphJobMessage item) {
if (item.isVertexMessage()) {
- msgPerVertex.add(item.getVertexId(), item);
+ if (storage.containsKey(item.getVertexId())) {
+ storage.get(item.getVertexId()).addValuesBytes(item.getValuesBytes(),
item.size());
+ } else {
+ storage.put(item.getVertexId(), item);
+ }
} else {
mapMessages.add(item);
}
@@ -75,21 +82,32 @@ public class IncomingVertexMessageManage
@Override
public void clear() {
mapMessages.clear();
- msgPerVertex.clear();
+ storage.clear();
}
+ Iterator<GraphJobMessage> it;
+
@Override
public GraphJobMessage poll() {
if (mapMessages.size() > 0) {
return mapMessages.poll();
} else {
- return msgPerVertex.pollFirstEntry();
+ if(it == null) {
+ it = storage.values().iterator();
+ }
+
+ if(it.hasNext()) {
+ return it.next();
+ } else {
+ storage.clear();
+ return null;
+ }
}
}
@Override
public int size() {
- return msgPerVertex.size() + mapMessages.size();
+ return storage.size() + mapMessages.size();
}
// empty, not needed to implement
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Mon Apr 20 10:18:36 2015
@@ -46,8 +46,8 @@ public final class MapVerticesInfo<V ext
implements VerticesInfo<V, E, M> {
private final Map<V, Vertex<V, E, M>> vertices = new ConcurrentHashMap<V,
Vertex<V, E, M>>();
- private Set<V> computedVertices;
-
+ private Set<V> computedVertices = new HashSet<V>();
+
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
@@ -55,9 +55,9 @@ public final class MapVerticesInfo<V ext
@Override
public void put(Vertex<V, E, M> vertex) throws IOException {
- if(vertices.containsKey(vertex.getVertexID())) {
- for(Edge<V, E> e : vertex.getEdges())
- vertices.get(vertex.getVertexID()).addEdge(e);
+ if (vertices.containsKey(vertex.getVertexID())) {
+ for (Edge<V, E> e : vertex.getEdges())
+ vertices.get(vertex.getVertexID()).addEdge(e);
} else {
vertices.put(vertex.getVertexID(), vertex);
}
@@ -74,9 +74,9 @@ public final class MapVerticesInfo<V ext
@Override
public Collection<Vertex<V, E, M>> getValues() {
- return vertices.values();
+ return vertices.values();
}
-
+
@Override
public int size() {
return vertices.size();
@@ -90,8 +90,9 @@ public final class MapVerticesInfo<V ext
@Override
public Iterator<Vertex<V, E, M>> iterator() {
- final Iterator<Vertex<V, E, M>> vertexIterator =
vertices.values().iterator();
-
+ final Iterator<Vertex<V, E, M>> vertexIterator = vertices.values()
+ .iterator();
+
return new Iterator<Vertex<V, E, M>>() {
@Override
@@ -133,18 +134,18 @@ public final class MapVerticesInfo<V ext
@Override
public void startSuperstep() throws IOException {
- computedVertices = new HashSet<V>();
}
@Override
public void finishSuperstep() throws IOException {
+ computedVertices.clear();
}
@Override
public Set<V> getComputedVertices() {
return this.computedVertices;
}
-
+
public Set<V> getNotComputedVertices() {
return Sets.difference(vertices.keySet(), computedVertices);
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Mon Apr 20 10:18:36 2015
@@ -18,106 +18,39 @@
package org.apache.hama.graph;
import java.net.InetSocketAddress;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.AbstractOutgoingMessageManager;
-import org.apache.hama.util.ReflectionUtils;
public class OutgoingVertexMessageManager<M extends Writable> extends
AbstractOutgoingMessageManager<GraphJobMessage> {
protected static final Log LOG = LogFactory
.getLog(OutgoingVertexMessageManager.class);
- private Combiner<Writable> combiner;
- private HashMap<InetSocketAddress, MessagePerVertex> storage = new
HashMap<InetSocketAddress, MessagePerVertex>();
-
- @SuppressWarnings("unchecked")
@Override
public void init(HamaConfiguration conf) {
this.conf = conf;
-
- final String combinerName = conf.get(Constants.COMBINER_CLASS);
- if (combinerName != null) {
- try {
- combiner = (Combiner<Writable>) ReflectionUtils
- .newInstance(combinerName);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
}
@Override
public void addMessage(String peerName, GraphJobMessage msg) {
InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
- if (msg.isVertexMessage()) {
- WritableComparable<?> vertexID = msg.getVertexId();
-
- if (!storage.containsKey(targetPeerAddress)) {
- storage.put(targetPeerAddress, new MessagePerVertex());
- }
-
- MessagePerVertex msgPerVertex = storage.get(targetPeerAddress);
- msgPerVertex.add(vertexID, msg);
-
- // Combining messages
- if (combiner != null && msgPerVertex.get(vertexID).getNumOfValues() > 1)
{
-
- // Overwrite
- storage.get(targetPeerAddress).put(
- vertexID,
- new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
- vertexID).getIterableMessages())));
- }
- } else {
- outgoingBundles.get(targetPeerAddress).addMessage(msg);
- }
+ outgoingBundles.get(targetPeerAddress).addMessage(msg);
}
@Override
public void clear() {
outgoingBundles.clear();
- storage.clear();
}
@Override
public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>
getBundleIterator() {
- return new Iterator<Entry<InetSocketAddress,
BSPMessageBundle<GraphJobMessage>>>() {
- final Iterator<Entry<InetSocketAddress,
BSPMessageBundle<GraphJobMessage>>> bundles = outgoingBundles
- .entrySet().iterator();
-
- @Override
- public boolean hasNext() {
- return bundles.hasNext();
- }
-
- @Override
- public Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>
next() {
- Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> bundle =
bundles
- .next();
-
- MessagePerVertex msgStorage = storage.get(bundle.getKey());
- if (msgStorage != null) {
- bundle.getValue().addMessages(msgStorage.getMessages());
- }
- storage.remove(bundle.getKey());
- return bundle;
- }
-
- @Override
- public void remove() {
- }
-
- };
+ return outgoingBundles.entrySet().iterator();
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Mon Apr 20
10:18:36 2015
@@ -76,19 +76,20 @@ public abstract class Vertex<V extends W
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
- runner.sendMessage(e.getDestinationVertexID(), msg);
+ runner.sendMessage(e.getDestinationVertexID(),
GraphJobRunner.serialize(msg));
}
@Override
public void sendMessage(V destinationVertexID, M msg) throws IOException {
- runner.sendMessage(destinationVertexID, msg);
+ runner.sendMessage(destinationVertexID, GraphJobRunner.serialize(msg));
}
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
final List<Edge<V, E>> outEdges = this.getEdges();
+ byte[] serialized = GraphJobRunner.serialize(msg);
for (Edge<V, E> e : outEdges) {
- sendMessage(e, msg);
+ runner.sendMessage(e.getDestinationVertexID(), serialized);
}
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
Mon Apr 20 10:18:36 2015
@@ -17,6 +17,7 @@
*/
package org.apache.hama.graph;
+import java.io.IOException;
import java.util.List;
import java.util.PriorityQueue;
@@ -32,7 +33,7 @@ import com.google.common.collect.Lists;
public class TestGraphJobMessage extends TestCase {
@Test
- public void testPriorityQueue() {
+ public void testPriorityQueue() throws IOException {
PriorityQueue<GraphJobMessage> prio = new PriorityQueue<GraphJobMessage>();
prio.addAll(getMessages());
@@ -53,14 +54,14 @@ public class TestGraphJobMessage extends
assertTrue(prio.isEmpty());
}
- public List<GraphJobMessage> getMessages() {
+ public List<GraphJobMessage> getMessages() throws IOException {
GraphJobMessage mapMsg = new GraphJobMessage(new MapWritable());
GraphJobMessage vertexMsg1 = new GraphJobMessage(new Text("1"),
- new IntWritable());
+ GraphJobRunner.serialize(new IntWritable()));
GraphJobMessage vertexMsg2 = new GraphJobMessage(new Text("2"),
- new IntWritable());
+ GraphJobRunner.serialize(new IntWritable()));
GraphJobMessage vertexMsg3 = new GraphJobMessage(new Text("3"),
- new IntWritable());
+ GraphJobRunner.serialize(new IntWritable()));
return Lists.newArrayList(mapMsg, vertexMsg1, vertexMsg2, vertexMsg3);
}
Modified: hama/trunk/mesos/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/mesos/pom.xml?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/mesos/pom.xml (original)
+++ hama/trunk/mesos/pom.xml Mon Apr 20 10:18:36 2015
@@ -96,7 +96,7 @@
<outputDirectory>${project.parent.basedir}/lib</outputDirectory>
</artifactItem>
</artifactItems>
- <excludeTransitive>true</excludeTransitive>
+ <excludeTransitive>false</excludeTransitive>
<fileMode>755</fileMode>
</configuration>
</execution>
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1674779&r1=1674778&r2=1674779&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Apr 20 10:18:36 2015
@@ -92,6 +92,7 @@
<commons-lang.version>2.6</commons-lang.version>
<commons-httpclient.version>3.0.1</commons-httpclient.version>
<commons-io.version>2.4</commons-io.version>
+ <commons-collections.version>3.2.1</commons-collections.version>
<commons-compress.version>1.9</commons-compress.version>
<hadoop.version>1.2.0</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
@@ -269,6 +270,11 @@
<version>${commons-httpclient.version}</version>
</dependency>
<dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>${commons-collections.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>${jetty.version}</version>