Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1340131&r1=1340130&r2=1340131&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Fri May 18 15:35:28 2012 @@ -23,12 +23,14 @@ public class MinAggregator extends Abstr int min = Integer.MAX_VALUE; + @Override public void aggregate(IntWritable value) { if (value.get() < min) { min = value.get(); } } + @Override public IntWritable getValue() { return new IntWritable(min); }
Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1340131&r1=1340130&r2=1340131&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri May 18 15:35:28 2012 @@ -23,24 +23,24 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hama.bsp.BSPPeer; -public abstract class Vertex<M extends Writable> implements VertexInterface<M> { +public abstract class Vertex<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE extends Writable> + implements VertexInterface<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> { - private M value; - private String vertexID; - protected GraphJobRunner runner; - protected BSPPeer<?, ?, ?, ?, MapWritable> peer; - public List<Edge> edges; + private MSG_TYPE value; + private ID_TYPE vertexID; + protected GraphJobRunner<ID_TYPE, MSG_TYPE, EDGE_VALUE_TYPE> runner; + protected BSPPeer<VertexWritable<ID_TYPE, MSG_TYPE>, VertexArrayWritable, Writable, Writable, Writable> peer; + public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> edges; public Configuration getConf() { return peer.getConfiguration(); } @Override - public String getVertexID() { + public ID_TYPE getVertexID() { return vertexID; } @@ -49,17 +49,17 @@ public abstract class Vertex<M extends W } @Override - public void sendMessage(Edge e, M msg) throws IOException { + public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg) + throws IOException { MapWritable message = new MapWritable(); - message.put(new Text(e.getName()), msg); - - peer.send(e.getDestVertexID(), message); + message.put(e.getDestinationVertexID(), msg); + peer.send(e.getDestinationPeerName(), message); } @Override - public void sendMessageToNeighbors(M msg) throws IOException { - final List<Edge> outEdges = this.getOutEdges(); - for (Edge e : outEdges) { + public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException { + final List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> outEdges = this.getOutEdges(); + for (Edge<ID_TYPE, EDGE_VALUE_TYPE> e : outEdges) { sendMessage(e, msg); } } @@ -70,21 +70,21 @@ public abstract class Vertex<M extends W } @Override - public List<Edge> getOutEdges() { + public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges() { return edges; } @Override - public M getValue() { + public MSG_TYPE getValue() { return value; } @Override - public void setValue(M value) { + public void setValue(MSG_TYPE value) { this.value = value; } - public void setVertexID(String vertexID) { + public void setVertexID(ID_TYPE vertexID) { this.vertexID = vertexID; } @@ -97,8 +97,8 @@ public abstract class Vertex<M extends W * was configured or not returned a result. */ @SuppressWarnings("unchecked") - public M getLastAggregatedValue() { - return (M) runner.getLastAggregatedValue(); + public MSG_TYPE getLastAggregatedValue() { + return (MSG_TYPE) runner.getLastAggregatedValue(); } /** @@ -113,6 +113,7 @@ public abstract class Vertex<M extends W return peer.getNumPeers(); } + @Override public long getNumVertices() { return runner.getNumberVertices(); } Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1340131&r1=1340130&r2=1340131&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Fri May 18 15:35:28 2012 @@ -24,7 +24,16 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; -public interface VertexInterface<MSGTYPE extends Writable> { +/** + * The vertex interface. + * + * @param <ID_TYPE> this type must be writable and should also implement equals + * and hashcode. + * @param <MSG_TYPE> the type used for messaging, usually the value of a vertex. + * @param <EDGE_VALUE_TYPE> the type used for storing edge values, usually the + * value of an edge. + */ +public interface VertexInterface<ID_TYPE extends Writable, MSG_TYPE extends Writable, EDGE_VALUE_TYPE extends Writable> { /** * Used to setup a vertex. @@ -32,22 +41,23 @@ public interface VertexInterface<MSGTYPE public void setup(Configuration conf); /** @return the unique identification for the vertex. */ - public String getVertexID(); + public ID_TYPE getVertexID(); /** @return the number of vertices in the input graph. */ public long getNumVertices(); /** The user-defined function */ - public void compute(Iterator<MSGTYPE> messages) throws IOException; + public void compute(Iterator<MSG_TYPE> messages) throws IOException; /** @return a list of outgoing edges of this vertex in the input graph. */ - public List<Edge> getOutEdges(); + public List<Edge<ID_TYPE, EDGE_VALUE_TYPE>> getOutEdges(); /** Sends a message to another vertex. */ - public void sendMessage(Edge e, MSGTYPE msg) throws IOException; + public void sendMessage(Edge<ID_TYPE, EDGE_VALUE_TYPE> e, MSG_TYPE msg) + throws IOException; /** Sends a message to neighbors */ - public void sendMessageToNeighbors(MSGTYPE msg) throws IOException; + public void sendMessageToNeighbors(MSG_TYPE msg) throws IOException; /** @return the superstep number of the current superstep (starting from 0). */ public long getSuperstepCount(); @@ -57,13 +67,13 @@ public interface VertexInterface<MSGTYPE * * @param value */ - public void setValue(MSGTYPE value); + public void setValue(MSG_TYPE value); /** * Gets the vertex value * * @return value */ - public MSGTYPE getValue(); + public MSG_TYPE getValue(); } Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java?rev=1340131&r1=1340130&r2=1340131&view=diff ============================================================================== --- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java (original) +++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java Fri May 18 15:35:28 2012 @@ -21,61 +21,104 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; -public class VertexWritable implements Writable, - WritableComparable<VertexWritable> { +public class VertexWritable<VERTEX_ID, VERTEX_VALUE> implements + WritableComparable<VertexWritable<VERTEX_ID, VERTEX_VALUE>>, Configurable { - public String name; - public int weight; + /** + * This field is static because it doesn't need to be an instance variable. It + * is written in upper case, because it is considered constant per launched + * process. + */ + public static Configuration CONFIGURATION; + + VERTEX_ID vertexId; + VERTEX_VALUE value; + Class<VERTEX_ID> idCls; + Class<VERTEX_VALUE> valCls; public VertexWritable() { super(); } - public VertexWritable(String name) { - super(); - this.name = name; - this.weight = 0; + @SuppressWarnings("unchecked") + public VertexWritable(VERTEX_ID name, Class<VERTEX_ID> idCls) { + this.vertexId = name; + this.value = (VERTEX_VALUE) new IntWritable(0); + this.idCls = idCls; + this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value); } + @SuppressWarnings("unchecked") public VertexWritable(int weight, String name) { - super(); - this.name = name; - this.weight = weight; + this.vertexId = (VERTEX_ID) new Text(name); + this.value = (VERTEX_VALUE) new IntWritable(weight); + this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId); + this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value); + } + + @SuppressWarnings("unchecked") + public VertexWritable(String name) { + this.vertexId = (VERTEX_ID) new Text(name); + this.value = (VERTEX_VALUE) NullWritable.get(); + this.idCls = org.apache.hadoop.util.ReflectionUtils.getClass(vertexId); + this.valCls = org.apache.hadoop.util.ReflectionUtils.getClass(value); + } + + public VertexWritable(VERTEX_VALUE weight, VERTEX_ID name, + Class<VERTEX_ID> idCls, Class<VERTEX_VALUE> valCls) { + this.vertexId = name; + this.value = weight; + this.idCls = idCls; + this.valCls = valCls; } - public String getName() { - return name; + public VERTEX_ID getVertexId() { + return vertexId; } - public int getWeight() { - return weight; + public VERTEX_VALUE getVertexValue() { + return value; } @Override public String toString() { - return getName(); + return getVertexId().toString(); } + @SuppressWarnings("unchecked") @Override public void readFields(DataInput in) throws IOException { - this.name = in.readUTF(); - this.weight = in.readInt(); + try { + idCls = (Class<VERTEX_ID>) CONFIGURATION.getClassByName(in.readUTF()); + valCls = (Class<VERTEX_VALUE>) CONFIGURATION.getClassByName(in.readUTF()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + vertexId = (VERTEX_ID) ObjectWritable.readObject(in, CONFIGURATION); + value = (VERTEX_VALUE) ObjectWritable.readObject(in, CONFIGURATION); } @Override public void write(DataOutput out) throws IOException { - out.writeUTF(name); - out.writeInt(weight); + out.writeUTF(idCls.getName()); + out.writeUTF(valCls.getName()); + ObjectWritable.writeObject(out, vertexId, idCls, CONFIGURATION); + ObjectWritable.writeObject(out, value, valCls, CONFIGURATION); } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((vertexId == null) ? 0 : vertexId.hashCode()); return result; } @@ -87,19 +130,32 @@ public class VertexWritable implements W return false; if (getClass() != obj.getClass()) return false; - VertexWritable other = (VertexWritable) obj; - if (name == null) { - if (other.name != null) + @SuppressWarnings("unchecked") + VertexWritable<VERTEX_ID, VERTEX_VALUE> other = (VertexWritable<VERTEX_ID, VERTEX_VALUE>) obj; + if (vertexId == null) { + if (other.vertexId != null) return false; - } else if (!name.equals(other.name)) + } else if (!vertexId.equals(other.vertexId)) return false; return true; } + @SuppressWarnings("unchecked") + @Override + public int compareTo(VertexWritable<VERTEX_ID, VERTEX_VALUE> o) { + VertexWritable<VERTEX_ID, VERTEX_VALUE> that = o; + return ((Comparable<VertexWritable<VERTEX_ID, VERTEX_VALUE>>) this.vertexId) + .compareTo((VertexWritable<VERTEX_ID, VERTEX_VALUE>) that.vertexId); + } + + @Override + public void setConf(Configuration conf) { + VertexWritable.CONFIGURATION = conf; + } + @Override - public int compareTo(VertexWritable o) { - VertexWritable that = (VertexWritable) o; - return this.name.compareTo(that.name); + public Configuration getConf() { + return CONFIGURATION; } }