Author: tjungblut
Date: Fri Jun 1 20:01:40 2012
New Revision: 1345327
URL: http://svn.apache.org/viewvc?rev=1345327&view=rev
Log:
Enable out of core messaging for bipartite matching in graph module
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1345327&r1=1345326&r2=1345327&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Fri Jun 1 20:01:40 2012
@@ -69,6 +69,7 @@ public final class GraphJobRunner<VERTEX
private Configuration conf;
private Combiner<VERTEX_VALUE> combiner;
+ private Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner;
// multiple aggregator arrays
private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>>[] aggregators;
@@ -92,11 +93,10 @@ public final class GraphJobRunner<VERTEX
private int maxIteration = -1;
private long iteration;
- // aimed to be accessed by vertex writables to serialize stuff
- Class<VERTEX_ID> vertexIdClass;
- Class<VERTEX_VALUE> vertexValueClass;
- Class<EDGE_VALUE_TYPE> edgeValueClass;
- Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertexClass;
+ private Class<VERTEX_ID> vertexIdClass;
+ private Class<VERTEX_VALUE> vertexValueClass;
+ private Class<EDGE_VALUE_TYPE> edgeValueClass;
+ private Class<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertexClass;
@Override
@SuppressWarnings("unchecked")
@@ -125,7 +125,7 @@ public final class GraphJobRunner<VERTEX
boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
boolean runtimePartitioning = conf.getBoolean(
GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
- Partitioner<VERTEX_ID, VERTEX_VALUE> partitioner = (Partitioner<VERTEX_ID,
VERTEX_VALUE>) ReflectionUtils
+ partitioner = (Partitioner<VERTEX_ID, VERTEX_VALUE>) ReflectionUtils
.newInstance(
conf.getClass("bsp.input.partitioner.class",
HashPartitioner.class),
conf);
@@ -389,7 +389,7 @@ public final class GraphJobRunner<VERTEX
boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex =
newVertexInstance(
vertexClass, conf);
- vertex.peer = peer;
+ vertex.setPeer(peer);
vertex.runner = this;
while (true) {
KeyValuePair<Writable, Writable> next = peer.readNext();
@@ -424,7 +424,7 @@ public final class GraphJobRunner<VERTEX
vertices.put(vertex.getVertexID(), vertex);
}
vertex = newVertexInstance(vertexClass, conf);
- vertex.peer = peer;
+ vertex.setPeer(peer);
vertex.runner = this;
}
@@ -434,7 +434,7 @@ public final class GraphJobRunner<VERTEX
while ((msg = peer.getCurrentMessage()) != null) {
Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> messagedVertex =
(Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) msg
.getVertex();
- messagedVertex.peer = peer;
+ messagedVertex.setPeer(peer);
messagedVertex.runner = this;
messagedVertex.setup(conf);
vertices.put(messagedVertex.getVertexID(), messagedVertex);
@@ -466,7 +466,7 @@ public final class GraphJobRunner<VERTEX
if (!vertices.containsKey(vertexName)) {
Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> newVertex =
newVertexInstance(
vertexClass, conf);
- newVertex.peer = peer;
+ newVertex.setPeer(peer);
newVertex.setVertexID(vertexName);
newVertex.runner = this;
if (selfReference) {
@@ -543,6 +543,10 @@ public final class GraphJobRunner<VERTEX
return maxIteration;
}
+ public Partitioner<VERTEX_ID, VERTEX_VALUE> getPartitioner() {
+ return partitioner;
+ }
+
public final Writable getLastAggregatedValue(int index) {
return globalAggregatorResult[index];
}
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1345327&r1=1345326&r2=1345327&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
(original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Fri Jun 1 20:01:40 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Partitioner;
public abstract class Vertex<ID_TYPE extends Writable, MSG_TYPE extends
Writable, EDGE_VALUE_TYPE extends Writable>
implements VertexInterface<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> {
@@ -32,7 +33,7 @@ public abstract class Vertex<ID_TYPE ext
private ID_TYPE vertexID;
private MSG_TYPE value;
protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner;
- protected BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
+ private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
private List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges;
public Configuration getConf() {
@@ -64,6 +65,15 @@ public abstract class Vertex<ID_TYPE ext
}
@Override
+ public void sendMessage(ID_TYPE destinationVertexID, MSG_TYPE msg)
+ throws IOException {
+ int partition = getPartitioner().getPartition(destinationVertexID, msg,
+ peer.getNumPeers());
+ String destPeer = peer.getAllPeerNames()[partition];
+ peer.send(destPeer, new GraphJobMessage(destinationVertexID, msg));
+ }
+
+ @Override
public long getSuperstepCount() {
return runner.getNumberIterations();
}
@@ -131,6 +141,22 @@ public abstract class Vertex<ID_TYPE ext
return peer.getNumPeers();
}
+ /**
+ * Gives access to the BSP primitives and additional features by a peer.
+ */
+ public BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
getPeer() {
+ return peer;
+ }
+
+ public Partitioner<ID_TYPE, MSG_TYPE> getPartitioner() {
+ return runner.getPartitioner();
+ }
+
+ void setPeer(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+ this.peer = peer;
+ }
+
@Override
public long getNumVertices() {
return runner.getNumberVertices();
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1345327&r1=1345326&r2=1345327&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Fri Jun 1 20:01:40 2012
@@ -40,39 +40,55 @@ public interface VertexInterface<ID_TYPE
*/
public void setup(Configuration conf);
- /** @return the unique identification for the vertex. */
+ /**
+ * @return the unique identification for the vertex.
+ */
public ID_TYPE getVertexID();
- /** @return the number of vertices in the input graph. */
+ /**
+ * @return the number of vertices in the input graph.
+ */
public long getNumVertices();
- /** The user-defined function */
+ /**
+ * The user-defined function
+ */
public void compute(Iterator<MSG_TYPE> messages) throws IOException;
- /** @return a list of outgoing edges of this vertex in the input graph. */
+ /**
+ * @return a list of outgoing edges of this vertex in the input graph.
+ */
public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getEdges();
- /** Sends a message to another vertex. */
+ /**
+ * Sends a message to another vertex.
+ */
public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg)
throws IOException;
- /** Sends a message to neighbors */
+ /**
+ * Sends a message to neighbors
+ */
public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException;
- /** @return the superstep number of the current superstep (starting from 0).
*/
+ /**
+ * Sends a message to the given destination vertex by ID and the message
value
+ */
+ public void sendMessage(ID_TYPE destinationVertexID, MSG_TYPE msg)
+ throws IOException;
+
+ /**
+ * @return the superstep number of the current superstep (starting from 0).
+ */
public long getSuperstepCount();
/**
* Sets the vertex value
- *
- * @param value
*/
public void setValue(MSG_TYPE value);
/**
* Gets the vertex value
- *
- * @return value
*/
public MSG_TYPE getValue();