Author: edwardyoon
Date: Tue Feb 3 00:28:07 2015
New Revision: 1656609
URL: http://svn.apache.org/r1656609
Log:
HAMA-919: Manage messages per Vertex
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Removed:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/pom.xml
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
hama/trunk/graph/pom.xml
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Feb 3 00:28:07 2015
@@ -19,6 +19,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-919: Manage messages per Vertex (edwardyoon)
HAMA-923: add a toString() method for FloatArrayWritable and
TextArrayWritable classes (edwardyoon)
HAMA-921: Polish doSuperstep() function and VertexMessageIterable class
(Anastasis Andronidis)
HAMA-913: Add RPC implementation using netty(bsmin)
Modified: hama/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Tue Feb 3 00:28:07 2015
@@ -136,8 +136,14 @@
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-cache</artifactId>
+ <version>0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-kryo</artifactId>
+ <version>0.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue
Feb 3 00:28:07 2015
@@ -100,7 +100,9 @@ public class TestPersistQueue extends Te
}
int cnt = 0;
- while ((peer.getCurrentMessage()) != null) {
+ IntWritable result = null;
+ while ((result = peer.getCurrentMessage()) != null) {
+ System.out.println(result);
cnt++;
}
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
(original)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
Tue Feb 3 00:28:07 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.TextOutputFor
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
-import org.apache.hama.graph.MapVerticesInfo;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
@@ -142,10 +141,6 @@ public class DynamicGraph {
private static GraphJob createJob(String[] args, HamaConfiguration conf)
throws IOException {
- // NOTE: Graph modification APIs can be used only with MapVerticesInfo.
- conf.set("hama.graph.vertices.info",
- "org.apache.hama.graph.MapVerticesInfo");
-
GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
graphJob.setJobName("Dynamic Graph");
graphJob.setVertexClass(GraphVertex.class);
Modified: hama/trunk/graph/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/pom.xml (original)
+++ hama/trunk/graph/pom.xml Tue Feb 3 00:28:07 2015
@@ -42,16 +42,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.directmemory</groupId>
- <artifactId>directmemory-cache</artifactId>
- <version>0.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.directmemory</groupId>
- <artifactId>directmemory-kryo</artifactId>
- <version>0.2</version>
- </dependency>
- <dependency>
<groupId>org.apache.hama</groupId>
<artifactId>hama-core</artifactId>
<version>${project.version}</version>
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue Feb
3 00:28:07 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.PartitioningR
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.OutgoingMessageManager;
import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.SortedMemoryQueue;
import com.google.common.base.Preconditions;
@@ -58,8 +57,8 @@ public class GraphJob extends BSPJob {
throws IOException {
super(conf);
conf.setClass(MessageManager.OUTGOING_MESSAGE_MANAGER_CLASS,
- OutgoingVertexMessagesManager.class, OutgoingMessageManager.class);
-
+ OutgoingVertexMessageManager.class, OutgoingMessageManager.class);
+
this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
@@ -128,8 +127,8 @@ public class GraphJob extends BSPJob {
/**
* Sets the input reader for parsing the input to vertices.
*/
- public void setVertexInputReaderClass(@SuppressWarnings("rawtypes")
- Class<? extends VertexInputReader> cls) {
+ public void setVertexInputReaderClass(
+ @SuppressWarnings("rawtypes") Class<? extends VertexInputReader> cls) {
ensureState(JobState.DEFINE);
conf.setClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER, cls,
RecordConverter.class);
@@ -140,8 +139,8 @@ public class GraphJob extends BSPJob {
* Sets the output writer for materializing vertices to the output sink. If
* not set, the default DefaultVertexOutputWriter will be used.
*/
- public void setVertexOutputWriterClass(@SuppressWarnings("rawtypes")
- Class<? extends VertexOutputWriter> cls) {
+ public void setVertexOutputWriterClass(
+ @SuppressWarnings("rawtypes") Class<? extends VertexOutputWriter> cls) {
ensureState(JobState.DEFINE);
conf.setClass(VERTEX_OUTPUT_WRITER_CLASS_ATTR, cls,
VertexOutputWriter.class);
@@ -154,8 +153,8 @@ public class GraphJob extends BSPJob {
}
@Override
- public void setPartitioner(@SuppressWarnings("rawtypes")
- Class<? extends Partitioner> theClass) {
+ public void setPartitioner(
+ @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
super.setPartitioner(theClass);
conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
}
@@ -201,9 +200,8 @@ public class GraphJob extends BSPJob {
this.setVertexOutputWriterClass(DefaultVertexOutputWriter.class);
}
- // add the default message queue to the sorted one
this.getConfiguration().setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- SortedMemoryQueue.class, MessageQueue.class);
+ IncomingVertexMessageManager.class, MessageQueue.class);
super.submit();
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Tue Feb 3 00:28:07 2015
@@ -17,9 +17,15 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
@@ -46,10 +52,14 @@ public final class GraphJobMessage imple
private MapWritable map;
@SuppressWarnings("rawtypes")
private WritableComparable vertexId;
- private Writable vertexValue;
- private IntWritable verticesSize;
+ private IntWritable integerMessage;
private static GraphJobMessageComparator comparator;
+ private int numOfValues = 0;
+
+ private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+ private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+
static {
if (comparator == null) {
comparator = new GraphJobMessageComparator();
@@ -61,6 +71,13 @@ public final class GraphJobMessage imple
public GraphJobMessage() {
}
+ public byte[] serialize(Writable message) throws IOException {
+ ByteArrayOutputStream mbos = new ByteArrayOutputStream();
+ DataOutputStream mdos = new DataOutputStream(mbos);
+ message.write(mdos);
+ return mbos.toByteArray();
+ }
+
public GraphJobMessage(MapWritable map) {
this.flag = MAP_FLAG;
this.map = map;
@@ -69,12 +86,20 @@ public final class GraphJobMessage imple
public GraphJobMessage(WritableComparable<?> vertexId, Writable vertexValue)
{
this.flag = VERTEX_FLAG;
this.vertexId = vertexId;
- this.vertexValue = vertexValue;
+
+ add(vertexValue);
+ }
+
+ public GraphJobMessage(WritableComparable<?> vertexId, List<Writable>
values) {
+ this.flag = VERTEX_FLAG;
+ this.vertexId = vertexId;
+
+ addAll(values);
}
public GraphJobMessage(IntWritable size) {
this.flag = VERTICES_SIZE_FLAG;
- this.verticesSize = size;
+ this.integerMessage = size;
}
@Override
@@ -84,11 +109,14 @@ public final class GraphJobMessage imple
// we don't need to write the classes because the other side has the same
// classes for the two entities.
vertexId.write(out);
- vertexValue.write(out);
+
+ out.writeInt(numOfValues);
+ out.writeInt(byteBuffer.size());
+ out.write(byteBuffer.toByteArray());
} else if (isMapMessage()) {
map.write(out);
} else if (isVerticesSizeMessage()) {
- verticesSize.write(out);
+ integerMessage.write(out);
} else {
vertexId.write(out);
}
@@ -107,8 +135,8 @@ public final class GraphJobMessage imple
map = new MapWritable();
map.readFields(in);
} else if (isVerticesSizeMessage()) {
- verticesSize = new IntWritable();
- verticesSize.readFields(in);
+ integerMessage = new IntWritable();
+ integerMessage.readFields(in);
} else {
vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
null);
@@ -122,14 +150,19 @@ public final class GraphJobMessage imple
if (isVertexMessage()) {
vertexId = GraphJobRunner.createVertexIDObject();
vertexId.readFields(in);
- vertexValue = GraphJobRunner.createVertexValue();
- vertexValue.readFields(in);
+
+ this.numOfValues = in.readInt();
+ int bytesLength = in.readInt();
+ byte[] temp = new byte[bytesLength];
+ in.readFully(temp);
+ bufferDos.write(temp);
+ bufferDos.flush();
} else if (isMapMessage()) {
map = new MapWritable();
map.readFields(in);
} else if (isVerticesSizeMessage()) {
- verticesSize = new IntWritable();
- verticesSize.readFields(in);
+ integerMessage = new IntWritable();
+ integerMessage.readFields(in);
} else {
vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
null);
@@ -161,12 +194,53 @@ public final class GraphJobMessage imple
return vertexId;
}
- public Writable getVertexValue() {
- return vertexValue;
+ private ByteArrayInputStream bis = null;
+ private DataInputStream dis = null;
+
+ public List<Writable> getVertexValue() {
+ bis = new ByteArrayInputStream(byteBuffer.toByteArray());
+ dis = new DataInputStream(bis);
+
+ List<Writable> valuesCache = new ArrayList<Writable>();
+
+ for (int i = 0; i < numOfValues; i++) {
+ try {
+ Writable v = GraphJobRunner.createVertexValue();
+ v.readFields(dis);
+ valuesCache.add(v);
+ } catch (IOException e) {
+ System.out.println(i + ", " + numOfValues);
+ e.printStackTrace();
+ }
+ }
+
+ return valuesCache;
+ }
+
+ public void add(Writable value) {
+ try {
+ bufferDos.write(serialize(value));
+ numOfValues++;
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void addAll(List<Writable> values) {
+ for (Writable v : values)
+ add(v);
+ }
+
+ /**
+ * @return the number of values
+ */
+ public int size() {
+ return this.numOfValues;
}
public IntWritable getVerticesSize() {
- return verticesSize;
+ return integerMessage;
}
public boolean isMapMessage() {
@@ -184,14 +258,14 @@ public final class GraphJobMessage imple
@Override
public String toString() {
if (isVertexMessage()) {
- return "ID: " + vertexId + " Val: " + vertexValue;
+ return "ID: " + vertexId + " Val: " + numOfValues;
} else if (isMapMessage()) {
return "Map: " + map;
} else if (isVerticesSizeMessage()) {
- return "#Vertices: " + verticesSize;
+ return "#Vertices: " + integerMessage;
} else {
return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
- + vertexId + ", vertexValue=" + vertexValue + "]";
+ + vertexId + ", vertexValue=" + numOfValues + "]";
}
}
@@ -233,4 +307,5 @@ public final class GraphJobMessage imple
return compare(key1, key2); // compare them
}
}
+
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Feb 3 00:28:07 2015
@@ -19,7 +19,12 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map.Entry;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,12 +44,11 @@ import org.apache.hama.bsp.PartitioningR
import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
-import org.apache.hama.graph.IDSkippingIterator.Strategy;
import org.apache.hama.util.ReflectionUtils;
/**
* Fully generic graph job runner.
- *
+ *
* @param <V> the id type of a vertex.
* @param <E> the value type of an edge.
* @param <M> the value type of a vertex.
@@ -89,6 +93,7 @@ public final class GraphJobRunner<V exte
public static Class<Vertex<?, ?, ?>> vertexClass;
private VerticesInfo<V, E, M> vertices;
+
private boolean updated = true;
private int globalUpdateCounts = 0;
private int changedVertexCnt = 0;
@@ -134,7 +139,7 @@ public final class GraphJobRunner<V exte
// note that the messages must be parsed here
GraphJobMessage firstVertexMessage = parseMessages(peer);
// master/slaves needs to update
- firstVertexMessage = doAggregationUpdates(firstVertexMessage, peer);
+ doAggregationUpdates(peer);
// check if updated changed by our aggregators
if (!updated) {
break;
@@ -153,6 +158,7 @@ public final class GraphJobRunner<V exte
/**
* Just write <ID as Writable, Value as Writable> pair as a result. Note that
* this will also be executed when failure happened.
+ *
* @param peer
* @throws java.io.IOException
*/
@@ -161,11 +167,11 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
vertexOutputWriter.setup(conf);
- IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
- while (skippingIterator.hasNext()) {
- vertexOutputWriter.write(skippingIterator.next(), peer);
+ Iterator<Vertex<V, E, M>> iterator = vertices.iterator();
+ while (iterator.hasNext()) {
+ vertexOutputWriter.write(iterator.next(), peer);
}
- vertices.cleanup(conf, peer.getTaskId());
+ vertices.clear();
}
/**
@@ -173,8 +179,7 @@ public final class GraphJobRunner<V exte
* master aggregation. In case of no aggregators defined, we save a sync by
* reading multiple typed messages.
*/
- private GraphJobMessage doAggregationUpdates(
- GraphJobMessage firstVertexMessage,
+ private void doAggregationUpdates(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
@@ -197,114 +202,92 @@ public final class GraphJobRunner<V exte
peer.send(peerName, new GraphJobMessage(updatedCnt));
}
}
+
if (getAggregationRunner().isEnabled()) {
- // in case we need to sync, we need to replay the messages that already
- // are added to the queue. This prevents loosing messages when using
- // aggregators.
- if (firstVertexMessage != null) {
- peer.send(peer.getPeerName(), firstVertexMessage);
- }
- GraphJobMessage msg;
- while ((msg = peer.getCurrentMessage()) != null) {
- peer.send(peer.getPeerName(), msg);
- }
- // now sync
peer.sync();
// now the map message must be read that might be send from the master
updated = getAggregationRunner().receiveAggregatedValues(
peer.getCurrentMessage().getMap(), iteration);
- // set the first vertex message back to the message it had before sync
- firstVertexMessage = peer.getCurrentMessage();
}
- return firstVertexMessage;
}
+ private Set<V> notComputedVertices;
+
/**
* Do the main logic of a superstep, namely checking if vertices are active,
- * feeding compute with messages and controlling combiners/aggregators.
- * We iterate over our messages and vertices in sorted order. That means
- * that we need to seek the first vertex that has the same ID as the
- * iterated message.
+ * feeding compute with messages and controlling combiners/aggregators. We
+ * iterate over our messages and vertices in sorted order. That means that we
+ * need to seek the first vertex that has the same ID as the iterated
message.
*/
+ @SuppressWarnings("unchecked")
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
this.changedVertexCnt = 0;
- this.vertices.startSuperstep();
-
- IDSkippingIterator<V, E, M> iterator = this.vertices.skippingIterator();
- VertexMessages<V, M> queueMessages = new VertexMessages<V, M>(peer);
- queueMessages.prependMessage(currentMessage);
-
- // note that can't skip inactive vertices because we have to rewrite the
- // complete vertex file in each iteration
- V firstVID = currentMessage == null ? null : (V)
currentMessage.getVertexId();
- while (iterator.hasNext(firstVID, Strategy.ALL)) {
- Vertex<V, E, M> vertex = iterator.next();
- boolean msgsExist = queueMessages.continueWith(vertex.getVertexID());
-
- if (!msgsExist) checkMsgOrder(vertex.getVertexID(), queueMessages);
-
- if (msgsExist && vertex.isHalted()) {
+ vertices.startSuperstep();
+
+ notComputedVertices = new HashSet();
+ notComputedVertices.addAll(vertices.keySet());
+
+ List<M> msgs = null;
+ Vertex<V, E, M> vertex = null;
+
+ while (currentMessage != null) {
+ vertex = vertices.get((V) currentMessage.getVertexId());
+
+ msgs = (List<M>) currentMessage.getVertexValue();
+ if (vertex.isHalted()) {
vertex.setActive();
}
if (!vertex.isHalted()) {
- vertex.compute(queueMessages);
+ vertex.compute(msgs);
+ notComputedVertices.remove(vertex.getVertexID());
activeVertices++;
}
- // Dump remaining messages
- queueMessages.dumpRest();
+ currentMessage = peer.getCurrentMessage();
+ vertices.finishVertexComputation(vertex);
+ }
- // note that we even need to rewrite the vertex if it is halted for
- // consistency reasons
- this.vertices.finishVertexComputation(vertex);
+ for (V v : notComputedVertices) {
+ vertex = vertices.get(v);
+ if (!vertex.isHalted()) {
+ vertex.compute(Collections.<M> emptyList());
+ vertices.finishVertexComputation(vertex);
+ activeVertices++;
+ }
}
- this.vertices.finishSuperstep();
+ vertices.finishSuperstep();
getAggregationRunner().sendAggregatorValues(peer, activeVertices,
this.changedVertexCnt);
this.iteration++;
}
/**
- * Utility that ensures that the incoming messages have a target vertex.
- */
- private void checkMsgOrder(V vid, VertexMessages<V, M> vm) {
- // When the vid is greater than the current message, it means that a vertex
- // has sent a message to an other vertex that doesn't exist
- if (vm.getMessageVID() != null && vm.getMessageVID().compareTo(vid) < 0) {
- if (conf.getBoolean("hama.check.missing.vertex", true)) {
- throw new IllegalArgumentException(
- "A message has recieved with a destination ID: " +
vm.getMessageVID()
- + " that does not exist! (Vertex iterator is at" + vid + "
ID)");
- } else {
- // Skip all unrecognized messages until we find a match
- vm.continueUntil(vid);
- }
- }
- }
-
- /**
* Seed the vertices first with their own values in compute. This is the
first
* superstep after the vertices have been loaded.
*/
private void doInitialSuperstep(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- vertices.startSuperstep();
this.changedVertexCnt = 0;
- IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
- while (skippingIterator.hasNext()) {
- Vertex<V, E, M> vertex = skippingIterator.next();
+ vertices.startSuperstep();
+
+ Iterator<Vertex<V, E, M>> iterator = vertices.iterator();
+
+ while (iterator.hasNext()) {
+ Vertex<V, E, M> vertex = iterator.next();
// Calls setup method.
vertex.setup(conf);
+
vertex.compute(Collections.singleton(vertex.getValue()));
vertices.finishVertexComputation(vertex);
}
+
vertices.finishSuperstep();
getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
@@ -335,7 +318,7 @@ public final class GraphJobRunner<V exte
getAggregationRunner().setupAggregators(peer);
Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<?
extends VerticesInfo<V, E, M>>) conf
- .getClass("hama.graph.vertices.info", ListVerticesInfo.class,
+ .getClass("hama.graph.vertices.info", MapVerticesInfo.class,
VerticesInfo.class);
vertices = ReflectionUtils.newInstance(verticesInfoClass);
vertices.init(this, conf, peer.getTaskId());
@@ -396,14 +379,15 @@ public final class GraphJobRunner<V exte
}
} else {
if (vertex.compareTo(currentVertex) > 0) {
- throw new IOException("The records of split aren't in order by
vertex ID.");
+ throw new IOException(
+ "The records of split aren't in order by vertex ID.");
}
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
- vertices.addVertex(vertex);
+ addVertex(vertex);
vertex = currentVertex;
}
}
@@ -412,11 +396,7 @@ public final class GraphJobRunner<V exte
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
- vertices.addVertex(vertex);
-
- vertices.finishAdditions();
- // finish the "superstep" because we have written a new file here
- vertices.finishSuperstep();
+ addVertex(vertex);
LOG.info(vertices.size() + " vertices are loaded into "
+ peer.getPeerName());
@@ -425,56 +405,32 @@ public final class GraphJobRunner<V exte
/**
* Add new vertex into memory of each peer.
- *
+ *
* @throws IOException
*/
private void addVertex(Vertex<V, E, M> vertex) throws IOException {
- vertex.setRunner(this);
- vertex.setup(conf);
-
if (conf.getBoolean("hama.graph.self.ref", false)) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
-
+ vertices.put(vertex);
+
LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer "
+ peer.getPeerName());
- vertices.addVertex(vertex);
}
/**
* Remove vertex from this peer.
- *
+ *
* @throws IOException
*/
private void removeVertex(V vertexID) {
- vertices.removeVertex(vertexID);
+ vertices.remove(vertexID);
+
LOG.debug("Removed VertexID: " + vertexID + " in peer "
+ peer.getPeerName());
}
/**
- * After all inserts are done, we must finalize the VertexInfo data
structure.
- *
- * @throws IOException
- */
- private void finishAdditions() throws IOException {
- vertices.finishAdditions();
- // finish the "superstep" because we have written a new file here
- vertices.finishSuperstep();
- }
-
- /**
- * After all inserts are done, we must finalize the VertexInfo data
structure.
- *
- * @throws IOException
- */
- private void finishRemovals() throws IOException {
- vertices.finishRemovals();
- // finish the "superstep" because we have written a new file here
- vertices.finishSuperstep();
- }
-
- /**
* Counts vertices globally by sending the count of vertices in the map to
the
* other peers.
*/
@@ -502,7 +458,7 @@ public final class GraphJobRunner<V exte
/**
* Parses the messages in every superstep and does actions according to flags
* in the messages.
- *
+ *
* @return the first vertex message, null if none received.
*/
@SuppressWarnings("unchecked")
@@ -512,7 +468,7 @@ public final class GraphJobRunner<V exte
GraphJobMessage msg = null;
boolean dynamicAdditions = false;
boolean dynamicRemovals = false;
-
+
while ((msg = peer.getCurrentMessage()) != null) {
// either this is a vertex message or a directive that must be read
// as map
@@ -563,7 +519,7 @@ public final class GraphJobRunner<V exte
}
}
-
+
// If we applied any changes to vertices, we need to call finishAdditions
// and finishRemovals in the end.
if (dynamicAdditions) {
@@ -576,6 +532,18 @@ public final class GraphJobRunner<V exte
return msg;
}
+ private void finishRemovals() throws IOException {
+ vertices.finishRemovals();
+ // finish the "superstep" because we have written a new file here
+ vertices.finishSuperstep();
+ }
+
+ private void finishAdditions() throws IOException {
+ vertices.finishAdditions();
+ // finish the "superstep" because we have written a new file here
+ vertices.finishSuperstep();
+ }
+
/**
* @return the number of vertices, globally accumulated.
*/
@@ -607,7 +575,7 @@ public final class GraphJobRunner<V exte
/**
* Gets the last aggregated value at the given index. The index is dependend
* on how the aggregators were configured during job setup phase.
- *
+ *
* @return the value of the aggregator, or null if none was defined.
*/
public final Writable getLastAggregatedValue(int index) {
@@ -617,7 +585,7 @@ public final class GraphJobRunner<V exte
/**
* Gets the last aggregated number of vertices at the given index. The index
* is dependend on how the aggregators were configured during job setup
phase.
- *
+ *
* @return the value of the aggregator, or null if none was defined.
*/
public final IntWritable getNumLastAggregatedVertices(int index) {
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1656609&view=auto
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
(added)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Tue Feb 3 00:28:07 2015
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SynchronizedQueue;
+
+public class IncomingVertexMessageManager<M extends WritableComparable<M>>
+ implements SynchronizedQueue<GraphJobMessage> {
+
+ private Configuration conf;
+
+ private final MessagePerVertex msgPerVertex = new MessagePerVertex();
+ private final ConcurrentLinkedQueue<GraphJobMessage> mapMessages = new
ConcurrentLinkedQueue<GraphJobMessage>();
+
+ @Override
+ public Iterator<GraphJobMessage> iterator() {
+ return msgPerVertex.iterator();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void addAll(Iterable<GraphJobMessage> col) {
+ for (GraphJobMessage m : col)
+ add(m);
+ }
+
+ @Override
+ public void addAll(MessageQueue<GraphJobMessage> otherqueue) {
+ GraphJobMessage poll = null;
+ while ((poll = otherqueue.poll()) != null) {
+ add(poll);
+ }
+ }
+
+ @Override
+ public void add(GraphJobMessage item) {
+ if (item.isVertexMessage()) {
+ msgPerVertex.add(item.getVertexId(), item.getVertexValue());
+ } else if (item.isMapMessage() || item.isVerticesSizeMessage()) {
+ mapMessages.add(item);
+ }
+ }
+
+ @Override
+ public void clear() {
+ msgPerVertex.clear();
+ }
+
+ @Override
+ public GraphJobMessage poll() {
+ if (mapMessages.size() > 0) {
+ return mapMessages.poll();
+ } else {
+ return msgPerVertex.pollFirstEntry();
+ }
+ }
+
+ @Override
+ public int size() {
+ return msgPerVertex.size();
+ }
+
+ // empty, not needed to implement
+
+ @Override
+ public void init(Configuration conf, TaskAttemptID id) {
+
+ }
+
+ @Override
+ public void close() {
+ this.clear();
+ }
+
+ @Override
+ public void prepareRead() {
+
+ }
+
+ @Override
+ public void prepareWrite() {
+
+ }
+
+ @Override
+ public boolean isMessageSerialized() {
+ return false;
+ }
+
+ @Override
+ public boolean isMemoryBasedQueue() {
+ return true;
+ }
+
+ @Override
+ public MessageQueue<GraphJobMessage> getMessageQueue() {
+ return this;
+ }
+
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Tue Feb 3 00:28:07 2015
@@ -18,15 +18,15 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Iterator;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.util.KryoSerializer;
/**
* Stores the vertices into a memory-based tree map. This implementation allows
@@ -41,9 +41,8 @@ import org.apache.hama.util.KryoSerializ
public final class MapVerticesInfo<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
private GraphJobRunner<V, E, M> runner;
- Vertex<V, E, M> v;
- private final SortedMap<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+ private final Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
M>>();
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
@@ -52,98 +51,67 @@ public final class MapVerticesInfo<V ext
}
@Override
- public void addVertex(Vertex<V, E, M> vertex) throws IOException {
- if (verticesMap.containsKey(vertex.getVertexID())) {
- throw new UnsupportedOperationException("Vertex with ID: "
- + vertex.getVertexID() + " already exists!");
- } else {
- verticesMap.put(vertex.getVertexID(), serialize(vertex));
- }
+ public void put(Vertex<V, E, M> vertex) throws IOException {
+ vertices.put(vertex.getVertexID(), vertex);
}
@Override
- public void removeVertex(V vertexID) throws UnsupportedOperationException {
- if (verticesMap.containsKey(vertexID)) {
- verticesMap.remove(vertexID);
- } else {
- throw new UnsupportedOperationException("Vertex with ID: " + vertexID
- + " not found on this peer.");
- }
+ public void remove(V vertexID) throws UnsupportedOperationException {
+ vertices.remove(vertexID);
}
public void clear() {
- verticesMap.clear();
+ vertices.clear();
}
@Override
public int size() {
- return this.verticesMap.size();
+ return vertices.size();
}
@Override
- public IDSkippingIterator<V, E, M> skippingIterator() {
- return new IDSkippingIterator<V, E, M>() {
- Iterator<V> it = verticesMap.keySet().iterator();
+ public Vertex<V, E, M> get(V vertexID) {
+ Vertex<V, E, M> vertex = vertices.get(vertexID);
+ vertex.setRunner(runner);
+ return vertex;
+ }
+
+ @Override
+ public Iterator<Vertex<V, E, M>> iterator() {
+
+ final Iterator<Vertex<V, E, M>> vertexIterator =
vertices.values().iterator();
+
+ return new Iterator<Vertex<V, E, M>>() {
@Override
- public boolean hasNext(V msgId,
- org.apache.hama.graph.IDSkippingIterator.Strategy strat)
- throws IOException {
-
- if (it.hasNext()) {
- V vertexID = it.next();
- v = deserialize(vertexID, verticesMap.get(vertexID));
-
- while (!strat.accept(v, msgId)) {
- if (it.hasNext()) {
- vertexID = it.next();
- v = deserialize(vertexID, verticesMap.get(vertexID));
- } else {
- return false;
- }
- }
-
- return true;
- } else {
- v = null;
- return false;
- }
+ public boolean hasNext() {
+ return vertexIterator.hasNext();
}
@Override
public Vertex<V, E, M> next() {
- if (v == null) {
- throw new UnsupportedOperationException(
- "You must invoke hasNext before ask for the next vertex.");
- }
-
- Vertex<V, E, M> tmp = v;
- v = null;
- return tmp;
+ Vertex<V, E, M> vertex = vertexIterator.next();
+ vertex.setRunner(runner);
+ return vertex;
}
- };
- }
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
+ }
- private final KryoSerializer kryo = new
KryoSerializer(GraphJobRunner.VERTEX_CLASS);
-
- public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
- return kryo.serialize(vertex);
+ };
}
- @SuppressWarnings("unchecked")
- public Vertex<V, E, M> deserialize(V vertexID, byte[] serialized)
- throws IOException {
- v = (Vertex<V, E, M>) kryo.deserialize(serialized);
- v.setRunner(runner);
- v.setVertexID(vertexID);
- return v;
+ @Override
+ public Set<V> keySet() {
+ return vertices.keySet();
}
@Override
public void finishVertexComputation(Vertex<V, E, M> vertex)
throws IOException {
- verticesMap.put(vertex.getVertexID(), serialize(vertex));
+ vertices.put(vertex.getVertexID(), vertex);
}
@Override
@@ -155,17 +123,11 @@ public final class MapVerticesInfo<V ext
}
@Override
- public void finishSuperstep() {
+ public void startSuperstep() throws IOException {
}
@Override
- public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
- throws IOException {
-
+ public void finishSuperstep() throws IOException {
}
- @Override
- public void startSuperstep() throws IOException {
-
- }
}
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java?rev=1656609&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
(added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MessagePerVertex.java
Tue Feb 3 00:28:07 2015
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+public class MessagePerVertex {
+
+ @SuppressWarnings("rawtypes")
+ private final ConcurrentNavigableMap<WritableComparable, GraphJobMessage>
storage = new ConcurrentSkipListMap<WritableComparable, GraphJobMessage>();
+
+ public int size() {
+ return storage.size();
+ }
+
+ public void clear() {
+ storage.clear();
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void put(WritableComparable vertexId, GraphJobMessage
graphJobMessage) {
+ storage.put(vertexId, graphJobMessage);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void add(WritableComparable vertexID, List<Writable> values) {
+ if (storage.containsKey(vertexID)) {
+ storage.get(vertexID).addAll(values);
+ } else {
+ put(vertexID, new GraphJobMessage(vertexID, values));
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public boolean containsKey(WritableComparable vertexID) {
+ return storage.containsKey(vertexID);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public GraphJobMessage get(WritableComparable vertexID) {
+ return storage.get(vertexID);
+ }
+
+ public Iterator<GraphJobMessage> iterator() {
+ return storage.values().iterator();
+ }
+
+ public GraphJobMessage pollFirstEntry() {
+ return (storage.size() > 0) ? storage.pollFirstEntry().getValue() : null;
+ }
+
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
Tue Feb 3 00:28:07 2015
@@ -31,6 +31,7 @@ import org.apache.hama.util.ReflectionUt
import java.io.IOException;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
/**
@@ -65,7 +66,7 @@ public class OffHeapVerticesInfo<V exten
ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER,
KryoSerializer.class, Serializer.class)))
.setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
- if (conf.getBoolean(DM_SORTED, true)) {
+ if (conf.getBoolean(DM_SORTED, false)) {
dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E, M>>>());
} else {
dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
@@ -77,85 +78,73 @@ public class OffHeapVerticesInfo<V exten
}
@Override
- public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
- throws IOException {
- vertices.dump();
- }
-
- @Override
- public void addVertex(Vertex<V, E, M> vertex) {
+ public void put(Vertex<V, E, M> vertex) {
vertices.put(vertex.getVertexID(), vertex);
}
- @Override
- public void finishAdditions() {
+ public void clear() {
+ vertices.clear();
}
@Override
- public void startSuperstep() throws IOException {
+ public int size() {
+ return (int) this.vertices.entries();
}
@Override
- public void finishSuperstep() throws IOException {
+ public void remove(V vertexID) {
+ vertices.free(vertexID);
}
@Override
- public void finishVertexComputation(Vertex<V, E, M> vertex)
- throws IOException {
- vertices.put(vertex.getVertexID(), vertex);
- }
-
- public void clear() {
- vertices.clear();
- }
+ public Vertex<V, E, M> get(V vertexID) {
+ Vertex<V, E, M> vertex = vertices.retrieve(vertexID);
+ vertex.setRunner(runner);
- @Override
- public int size() {
- return (int) this.vertices.entries();
+ return vertex;
}
@Override
- public IDSkippingIterator<V, E, M> skippingIterator() {
+ public Iterator<Vertex<V, E, M>> iterator() {
final Iterator<Vertex<V, E, M>> vertexIterator = new
CacheValuesIterable<V, Vertex<V, E, M>>(
vertices, strict).iterator();
- return new IDSkippingIterator<V, E, M>() {
- int currentIndex = 0;
-
- Vertex<V, E, M> currentVertex = null;
+ return new Iterator<Vertex<V, E, M>>() {
@Override
- public boolean hasNext(V e,
- org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
- if (currentIndex < vertices.entries()) {
-
- Vertex<V, E, M> next = vertexIterator.next();
- while (!strat.accept(next, e)) {
- currentIndex++;
- }
- currentVertex = next;
- return true;
- } else {
- return false;
- }
+ public boolean hasNext() {
+ return vertexIterator.hasNext();
}
@Override
public Vertex<V, E, M> next() {
- currentIndex++;
- if (currentVertex != null && currentVertex.getRunner() == null) {
- currentVertex.setRunner(runner);
- }
- return currentVertex;
+ Vertex<V, E, M> vertex = vertexIterator.next();
+ vertex.setRunner(runner);
+
+ return vertex;
+ }
+
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
}
};
+ }
+ @Override
+ public Set<V> keySet() {
+ return vertices.getMap().keySet();
}
@Override
- public void removeVertex(V vertexID) {
- vertices.free(vertexID);
+ public void finishVertexComputation(Vertex<V, E, M> vertex)
+ throws IOException {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+
+ @Override
+ public void finishAdditions() {
}
@Override
@@ -163,4 +152,12 @@ public class OffHeapVerticesInfo<V exten
vertices.collectExpired();
}
+ @Override
+ public void startSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishSuperstep() throws IOException {
+ }
+
}
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1656609&view=auto
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
(added)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Tue Feb 3 00:28:07 2015
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.util.BSPNetUtils;
+
+public class OutgoingVertexMessageManager<M extends Writable> implements
+ OutgoingMessageManager<GraphJobMessage> {
+ protected static final Log LOG = LogFactory
+ .getLog(OutgoingVertexMessageManager.class);
+
+ private HamaConfiguration conf;
+ private BSPMessageCompressor<GraphJobMessage> compressor;
+ private Combiner<Writable> combiner;
+ private final HashMap<String, InetSocketAddress> peerSocketCache = new
HashMap<String, InetSocketAddress>();
+ private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>
outgoingBundles = new HashMap<InetSocketAddress,
BSPMessageBundle<GraphJobMessage>>();
+
+ private HashMap<InetSocketAddress, MessagePerVertex> storage = new
HashMap<InetSocketAddress, MessagePerVertex>();
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(HamaConfiguration conf,
+ BSPMessageCompressor<GraphJobMessage> compressor) {
+ this.conf = conf;
+ this.compressor = compressor;
+ if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
+ Combiner.class)) {
+ LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
+
+ combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
+ .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
+ conf);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void addMessage(String peerName, GraphJobMessage msg) {
+ InetSocketAddress targetPeerAddress = getSocketAddress(peerName);
+
+ if (msg.isVertexMessage()) {
+ WritableComparable vertexID = msg.getVertexId();
+
+ if (!storage.containsKey(targetPeerAddress)) {
+ storage.put(targetPeerAddress, new MessagePerVertex());
+ }
+
+ MessagePerVertex msgPerVertex = storage.get(targetPeerAddress);
+ msgPerVertex.add(vertexID, msg.getVertexValue());
+
+ // Combining messages
+ if (combiner != null
+ && msgPerVertex.get(vertexID).getVertexValue().size() > 1) {
+ storage.get(targetPeerAddress).put(
+ vertexID,
+ new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
+ vertexID).getVertexValue())));
+ }
+
+ } else {
+ outgoingBundles.get(targetPeerAddress).addMessage(msg);
+ }
+ }
+
+ private InetSocketAddress getSocketAddress(String peerName) {
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = BSPNetUtils.getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+
+ if (!outgoingBundles.containsKey(targetPeerAddress)) {
+ BSPMessageBundle<GraphJobMessage> bundle = new
BSPMessageBundle<GraphJobMessage>();
+ if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ }
+ outgoingBundles.put(targetPeerAddress, bundle);
+ }
+ return targetPeerAddress;
+ }
+
+ @Override
+ public void clear() {
+ outgoingBundles.clear();
+ storage.clear();
+ }
+
+ @Override
+ public Iterator<Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>>
getBundleIterator() {
+ return new Iterator<Entry<InetSocketAddress,
BSPMessageBundle<GraphJobMessage>>>() {
+ final Iterator<Entry<InetSocketAddress,
BSPMessageBundle<GraphJobMessage>>> bundles = outgoingBundles
+ .entrySet().iterator();
+
+ @Override
+ public boolean hasNext() {
+ return bundles.hasNext();
+ }
+
+ @Override
+ public Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>
next() {
+ Entry<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> bundle =
bundles
+ .next();
+
+ MessagePerVertex msgStorage = storage.get(bundle.getKey());
+ if (msgStorage != null) {
+ Iterator<GraphJobMessage> it = msgStorage.iterator();
+ while (it.hasNext()) {
+ bundle.getValue().addMessage(it.next());
+ }
+ }
+
+ storage.remove(bundle.getKey());
+ return bundle;
+ }
+
+ @Override
+ public void remove() {
+ // TODO Auto-generated method stub
+ }
+
+ };
+ }
+
+}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Tue
Feb 3 00:28:07 2015
@@ -18,6 +18,8 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -41,22 +43,43 @@ public interface VerticesInfo<V extends
TaskAttemptID attempt) throws IOException;
/**
- * Cleanup of internal structures.
+ * Add a vertex to the underlying structure.
*/
- public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
- throws IOException;
+ public void put(Vertex<V, E, M> vertex) throws IOException;
+ public Vertex<V, E, M> get(V vertexID);
+
/**
- * Add a vertex to the underlying structure.
+ * Remove a vertex to the underlying structure.
*/
- public void addVertex(Vertex<V, E, M> vertex) throws IOException;
+ public void remove(V vertexID) throws UnsupportedOperationException;
/**
- * Remove a vertex to the underlying structure.
+ * @return the number of vertices added to the underlying structure.
+ * Implementations should take care this is a constant time
operation.
*/
- public void removeVertex(V vertexID) throws UnsupportedOperationException;
+ public int size();
/**
+ * Must be called once a vertex is guaranteed not to change any more and can
+ * safely be persisted to a secondary storage.
+ */
+ public void finishVertexComputation(Vertex<V, E, M> vertex)
+ throws IOException;
+
+ /**
+ * @return the iterator.
+ */
+ public Iterator<Vertex<V, E, M>> iterator();
+
+ public void clear();
+
+ /**
+ * @return the set of vertex IDs
+ */
+ public Set<V> keySet();
+
+ /**
* Finish the additions, from this point on the implementations should close
* the adds and throw exceptions in case something is added after this call.
*/
@@ -78,18 +101,4 @@ public interface VerticesInfo<V extends
*/
public void finishSuperstep() throws IOException;
- /**
- * Must be called once a vertex is guaranteed not to change any more and can
- * safely be persisted to a secondary storage.
- */
- public void finishVertexComputation(Vertex<V, E, M> vertex)
- throws IOException;
-
- /**
- * @return the number of vertices added to the underlying structure.
- * Implementations should take care this is a constant time
operation.
- */
- public int size();
-
- public IDSkippingIterator<V, E, M> skippingIterator();
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1656609&r1=1656608&r2=1656609&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Tue Feb 3 00:28:07 2015
@@ -61,8 +61,7 @@ public class TestSubmitGraphJob extends
@Before
public void setUp() throws Exception {
super.setUp();
- vi.add(ListVerticesInfo.class);
- vi.add(DiskVerticesInfo.class);
+ vi.add(MapVerticesInfo.class);
vi.add(OffHeapVerticesInfo.class);
}
@@ -120,7 +119,7 @@ public class TestSubmitGraphJob extends
@SuppressWarnings("rawtypes")
protected void injectVerticesInfo() {
Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math
- .abs(new Random().nextInt() % 3));
+ .abs(new Random().nextInt() % 2));
LOG.info("using vertices info of type : " + verticesInfoClass.getName());
}