Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java 
Wed Nov  2 15:40:02 2011
@@ -60,8 +60,8 @@ public class VertexRange<I extends Writa
     /** Checkpoint file prefix (null if not recovering from a checkpoint) */
     private String checkpointfilePrefix = null;
     /** Vertex map for this range (keyed by index) */
-    private final SortedMap<I, Vertex<I, V, E, M>> vertexMap =
-        new TreeMap<I, Vertex<I, V, E, M>>();
+    private final SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
+        new TreeMap<I, BasicVertex<I, V, E, M>>();
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(VertexRange.class);
 
@@ -199,7 +199,7 @@ public class VertexRange<I extends Writa
      *
      * @return Map of vertices (keyed by index)
      */
-    public SortedMap<I, Vertex<I, V, E, M>> getVertexMap() {
+    public SortedMap<I, BasicVertex<I, V, E, M>> getVertexMap() {
         return vertexMap;
     }
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java 
Wed Nov  2 15:40:02 2011
@@ -18,18 +18,19 @@
 
 package org.apache.giraph.graph;
 
-import java.io.IOException;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.IOException;
+
 @SuppressWarnings("rawtypes")
 public interface VertexReader<
         I extends WritableComparable,
         V extends Writable,
-        E extends Writable> {
+        E extends Writable,
+        M extends Writable> {
     /**
      * Use the input split and context to setup reading the vertices.
      * Guaranteed to be called prior to any other function.
@@ -43,14 +44,20 @@ public interface VertexReader<
         throws IOException, InterruptedException;
 
     /**
-     * Reads the next vertex and associated data
      *
-     * @param vertex set the properties of this vertex
-     * @return true iff a vertex and associated data was read, false if at EOF
+     * @return false iff there are no more vertices
+     * @throws IOException
      * @throws InterruptedException
      */
-    boolean next(MutableVertex<I, V, E, ?> vertex)
-        throws IOException, InterruptedException;
+    boolean nextVertex() throws IOException, InterruptedException;
+
+    /**
+     *
+     * @return the current vertex which has been read.  nextVertex() should be 
called first.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, 
InterruptedException;
 
     /**
      * Close this {@link VertexReader} to future operations.

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java
 Wed Nov  2 15:40:02 2011
@@ -111,8 +111,10 @@ public class VertexResolver<I extends Wr
     }
 
     @Override
-    public MutableVertex<I, V, E, M> instantiateVertex() {
-        return BspUtils.<I, V, E, M>createVertex(getConf(), graphState);
+    public BasicVertex<I, V, E, M> instantiateVertex() {
+        BasicVertex<I, V, E, M> vertex = 
BspUtils.<I,V,E,M>createVertex(getConf());
+        vertex.setGraphState(graphState);
+        return vertex;
     }
 
     @Override

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -37,6 +37,7 @@ import java.io.IOException;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@SuppressWarnings("rawtypes")
 public class AdjacencyListTextVertexOutputFormat <I extends WritableComparable,
     V extends Writable, E extends Writable> extends TextVertexOutputFormat<I, 
V, E>{
 
@@ -54,7 +55,7 @@ public class AdjacencyListTextVertexOutp
     @Override
     public void writeVertex(BasicVertex<I, V, E, ?> vertex) throws IOException,
         InterruptedException {
-      if(delimiter == null) {
+      if (delimiter == null) {
         delimiter = getContext().getConfiguration()
            .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
       }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
 Wed Nov  2 15:40:02 2011
@@ -17,9 +17,10 @@
  */
 package org.apache.giraph.lib;
 
+import com.google.common.collect.Maps;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.MutableVertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -28,6 +29,7 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.RecordReader;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * VertexReader that readers lines of text with vertices encoded as adjacency
@@ -41,9 +43,10 @@ import java.io.IOException;
  * @param <V> Vertex value
  * @param <E> Edge value
  */
+@SuppressWarnings("rawtypes")
 abstract class AdjacencyListVertexReader<I extends WritableComparable,
-    V extends Writable, E extends Writable> extends
-    TextVertexInputFormat.TextVertexReader<I, V, E> {
+    V extends Writable, E extends Writable, M extends Writable> extends
+    TextVertexInputFormat.TextVertexReader<I, V, E, M> {
 
   public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter";
   public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
@@ -94,18 +97,17 @@ abstract class AdjacencyListVertexReader
    */
   abstract public void decodeEdge(String id, String value, Edge<I, E> edge);
 
-  /**
-   * {@inheritDoc}
-   */
+
   @Override
-  public boolean next(MutableVertex<I, V, E, ?> iveMutableVertex)
-      throws IOException, InterruptedException {
-    if (!getRecordReader().nextKeyValue()) {
-        return false;
-    }
+  public boolean nextVertex() throws IOException, InterruptedException {
+    return getRecordReader().nextKeyValue();
+  }
 
+  @Override
+  public BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, 
InterruptedException {
     Configuration conf = getContext().getConfiguration();
     String line = getRecordReader().getCurrentValue().toString();
+    BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
 
     if (sanitizer != null) {
       line = sanitizer.sanitize(line);
@@ -123,20 +125,19 @@ abstract class AdjacencyListVertexReader
 
     I vertexId = BspUtils.<I>createVertexIndex(conf);
     decodeId(values[0], vertexId);
-    iveMutableVertex.setVertexId(vertexId);
 
     V value = BspUtils.<V>createVertexValue(conf);
     decodeValue(values[1], value);
-    iveMutableVertex.setVertexValue(value);
 
     int i = 2;
+    Map<I, E> edges = Maps.newHashMap();
     Edge<I, E> edge = new Edge<I, E>();
     while(i < values.length) {
       decodeEdge(values[i], values[i + 1], edge);
-      iveMutableVertex.addEdge(edge.getDestVertexId(), edge.getEdgeValue());
+      edges.put(edge.getDestVertexId(), edge.getEdgeValue());
       i += 2;
     }
-
-    return true;
+    vertex.initialize(vertexId, value, edges, null);
+    return vertex;
   }
 }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -18,10 +18,11 @@
 
 package org.apache.giraph.lib;
 
+import com.google.common.collect.Maps;
 import net.iharder.Base64;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.Edge;
-import org.apache.giraph.graph.MutableVertex;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
@@ -39,6 +40,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Simple way to represent the structure of the graph with a JSON object.
@@ -52,8 +54,9 @@ import java.io.IOException;
  */
 @SuppressWarnings("rawtypes")
 public class JsonBase64VertexInputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable>
-        extends TextVertexInputFormat<I, V, E> implements
+        I extends WritableComparable, V extends Writable, E extends Writable,
+        M extends Writable>
+        extends TextVertexInputFormat<I, V, E, M> implements
         JsonBase64VertexFormat {
     /**
      * Simple reader that supports {@link JsonBase64VertexInputFormat}
@@ -64,25 +67,27 @@ public class JsonBase64VertexInputFormat
      */
     private static class JsonBase64VertexReader<
             I extends WritableComparable, V extends Writable,
-            E extends Writable> extends TextVertexReader<I, V, E> {
+            E extends Writable, M extends Writable> extends 
TextVertexReader<I, V, E, M> {
         /**
          * Only constructor.  Requires the LineRecordReader
          *
          * @param lineRecordReader Line record reader to read from
          */
-        public JsonBase64VertexReader(
-                RecordReader<LongWritable, Text> lineRecordReader) {
+        public JsonBase64VertexReader(RecordReader<LongWritable, Text> 
lineRecordReader) {
             super(lineRecordReader);
         }
 
         @Override
-        public boolean next(MutableVertex<I, V, E, ?> vertex)
-                throws IOException, InterruptedException {
-            if (!getRecordReader().nextKeyValue()) {
-                return false;
-            }
+        public boolean nextVertex() throws IOException, InterruptedException {
+            return getRecordReader().nextKeyValue();
+        }
 
+        @Override
+        public BasicVertex<I, V, E, M> getCurrentVertex()
+                throws IOException, InterruptedException {
             Configuration conf = getContext().getConfiguration();
+            BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf);
+
             Text line = getRecordReader().getCurrentValue();
             JSONObject vertexObject;
             try {
@@ -93,26 +98,26 @@ public class JsonBase64VertexInputFormat
             }
             DataInput input = null;
             byte[] decodedWritable = null;
+            I vertexId = null;
             try {
                 decodedWritable = Base64.decode(
                     vertexObject.getString(VERTEX_ID_KEY));
                 input = new DataInputStream(
                     new ByteArrayInputStream(decodedWritable));
-                I vertexId = BspUtils.<I>createVertexIndex(conf);
+                vertexId = BspUtils.<I>createVertexIndex(conf);
                 vertexId.readFields(input);
-                vertex.setVertexId(vertexId);
             } catch (JSONException e) {
                 throw new IllegalArgumentException(
                     "next: Failed to get vertex id", e);
             }
+            V vertexValue = null;
             try {
                 decodedWritable = Base64.decode(
                     vertexObject.getString(VERTEX_VALUE_KEY));
                 input = new DataInputStream(
                     new ByteArrayInputStream(decodedWritable));
-                V vertexValue = BspUtils.<V>createVertexValue(conf);
+                vertexValue = BspUtils.<V>createVertexValue(conf);
                 vertexValue.readFields(input);
-                vertex.setVertexValue(vertexValue);
             } catch (JSONException e) {
                 throw new IllegalArgumentException(
                     "next: Failed to get vertex value", e);
@@ -124,6 +129,7 @@ public class JsonBase64VertexInputFormat
                 throw new IllegalArgumentException(
                     "next: Failed to get edge array", e);
             }
+            Map<I, E> edgeMap = Maps.newHashMap();
             for (int i = 0; i < edgeArray.length(); ++i) {
                 try {
                     decodedWritable =
@@ -137,17 +143,18 @@ public class JsonBase64VertexInputFormat
                 Edge<I, E> edge = new Edge<I, E>();
                 edge.setConf(getContext().getConfiguration());
                 edge.readFields(input);
-                vertex.addEdge(edge.getDestVertexId(), edge.getEdgeValue());
+                edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
             }
-            return true;
+            vertex.initialize(vertexId, vertexValue, edgeMap, null);
+            return vertex;
         }
     }
 
     @Override
-    public VertexReader<I, V, E> createVertexReader(
+    public VertexReader<I, V, E, M> createVertexReader(
             InputSplit split,
             TaskAttemptContext context) throws IOException {
-        return new JsonBase64VertexReader<I, V, E>(
-            textInputFormat.createRecordReader(split, context));
+        return new JsonBase64VertexReader<I, V, E, 
M>(textInputFormat.createRecordReader(split,
+            context));
     }
 }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -21,6 +21,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -35,11 +36,11 @@ import java.io.IOException;
  * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
  * with values of 0.3 and 0.44, respectively.
  */
-public class LongDoubleDoubleAdjacencyListVertexInputFormat extends
-    TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>  {
+public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends 
Writable> extends
+    TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M>  {
 
-  static class VertexReader extends
-      AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable> {
+  static class VertexReader<M extends Writable> extends
+      AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable, 
M> {
 
     VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
       super(lineRecordReader);
@@ -69,8 +70,11 @@ public class LongDoubleDoubleAdjacencyLi
   }
 
   @Override
-  public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit 
split,
+  public org.apache.giraph.graph.VertexReader<LongWritable,
+    DoubleWritable, DoubleWritable, M> createVertexReader(
+      InputSplit split,
       TaskAttemptContext context) throws IOException {
-    return new VertexReader(textInputFormat.createRecordReader(split, 
context));
+    return new VertexReader<M>(textInputFormat.createRecordReader(
+      split, context));
   }
 }

Added: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java?rev=1196639&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java
 (added)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -0,0 +1,72 @@
+package org.apache.giraph.lib;
+
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.VertexReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SequenceFileVertexInputFormat<I extends WritableComparable<I>,
+                                           V extends Writable,
+                                           E extends Writable,
+                                           M extends Writable,
+                                           X extends BasicVertex<I, V, E, M>>
+    extends VertexInputFormat<I, V, E, M> {
+  protected SequenceFileInputFormat<I, X> sequenceFileInputFormat
+      = new SequenceFileInputFormat<I, X>();
+
+  @Override public List<InputSplit> getSplits(JobContext context, int 
numWorkers)
+      throws IOException, InterruptedException {
+    return sequenceFileInputFormat.getSplits(context);
+  }
+
+  @Override
+  public VertexReader<I, V, E, M> createVertexReader(InputSplit split,
+      TaskAttemptContext context)
+      throws IOException {
+    return new SequenceFileVertexReader<I, V, E, M, X>(
+        sequenceFileInputFormat.createRecordReader(split, context));
+  }
+
+  public static class SequenceFileVertexReader<I extends WritableComparable<I>,
+      V extends Writable, E extends Writable, M extends Writable,
+      X extends BasicVertex<I, V, E, M>>
+      implements VertexReader<I, V, E, M> {
+    private final RecordReader<I, X> recordReader;
+
+    public SequenceFileVertexReader(RecordReader<I, X> recordReader) {
+      this.recordReader = recordReader;
+    }
+
+    @Override public void initialize(InputSplit inputSplit, TaskAttemptContext 
context)
+        throws IOException, InterruptedException {
+      recordReader.initialize(inputSplit, context);
+    }
+
+    @Override public boolean nextVertex() throws IOException, 
InterruptedException {
+      return recordReader.nextKeyValue();
+    }
+
+    @Override public BasicVertex<I, V, E, M> getCurrentVertex()
+        throws IOException, InterruptedException {
+      return recordReader.getCurrentValue();
+    }
+
+
+    @Override public void close() throws IOException {
+      recordReader.close();
+    }
+
+    @Override public float getProgress() throws IOException, 
InterruptedException {
+      return recordReader.getProgress();
+    }
+  }
+}

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -21,6 +21,7 @@ import org.apache.giraph.graph.Edge;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -32,11 +33,11 @@ import java.io.IOException;
  * Strings and values as doubles.  This is a good inputformat for reading
  * graphs where the id types do not matter and can be stashed in a String.
  */
-public class TextDoubleDoubleAdjacencyListVertexInputFormat
-    extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable>  {
+public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable>
+    extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M>  {
 
-  static class VertexReader extends AdjacencyListVertexReader<Text,
-      DoubleWritable, DoubleWritable> {
+  static class VertexReader<M extends Writable> extends 
AdjacencyListVertexReader<Text,
+      DoubleWritable, DoubleWritable, M> {
 
     VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
       super(lineRecordReader);
@@ -66,9 +67,12 @@ public class TextDoubleDoubleAdjacencyLi
   }
 
   @Override
-  public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit 
split,
-                                 TaskAttemptContext context) throws 
IOException {
-    return new VertexReader(textInputFormat.createRecordReader(split, 
context));
+  public org.apache.giraph.graph.VertexReader<Text, DoubleWritable,
+    DoubleWritable, M> createVertexReader(
+      InputSplit split,
+      TaskAttemptContext context) throws IOException {
+    return new VertexReader<M>(textInputFormat.createRecordReader(
+      split, context));
   }
 
 }

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -18,9 +18,6 @@
 
 package org.apache.giraph.lib;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.giraph.graph.VertexInputFormat;
 import org.apache.giraph.graph.VertexReader;
 import org.apache.hadoop.io.LongWritable;
@@ -33,6 +30,9 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 
+import java.io.IOException;
+import java.util.List;
+
 /**
  * Abstract class that users should subclass to use their own text based
  * vertex output format.
@@ -43,8 +43,11 @@ import org.apache.hadoop.mapreduce.lib.i
  */
 @SuppressWarnings("rawtypes")
 public abstract class TextVertexInputFormat<
-        I extends WritableComparable, V extends Writable, E extends Writable>
-        extends VertexInputFormat<I, V, E> {
+        I extends WritableComparable,
+        V extends Writable,
+        E extends Writable,
+        M extends Writable>
+        extends VertexInputFormat<I, V, E, M> {
     /** Uses the TextInputFormat to do everything */
     protected TextInputFormat textInputFormat = new TextInputFormat();
 
@@ -58,8 +61,8 @@ public abstract class TextVertexInputFor
      * @param <E> Edge value
      */
     public static abstract class TextVertexReader<I extends WritableComparable,
-            V extends Writable, E extends Writable>
-            implements VertexReader<I, V, E> {
+            V extends Writable, E extends Writable, M extends Writable>
+            implements VertexReader<I, V, E, M> {
         /** Internal line record reader */
         private final RecordReader<LongWritable, Text> lineRecordReader;
         /** Context passed to initialize */

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java 
(original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java 
Wed Nov  2 15:40:02 2011
@@ -388,7 +388,6 @@ public class ZooKeeperManager {
      */
     private void getZooKeeperServerList()
             throws IOException, InterruptedException {
-        int serverListFileAttempt = 0;
         String serverListFile = null;
 
         if (taskPartition == 0) {
@@ -409,7 +408,6 @@ public class ZooKeeperManager {
             if (serverListFile != null) {
                 break;
             }
-            ++serverListFileAttempt;
             try {
                 Thread.sleep(pollMsecs);
             } catch (InterruptedException e) {

Modified: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java 
(original)
+++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java 
Wed Nov  2 15:40:02 2011
@@ -20,23 +20,23 @@ package org.apache.giraph;
 
 import junit.framework.Test;
 import junit.framework.TestSuite;
+import 
org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
+import 
org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
+import 
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
+import 
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.examples.GeneratedVertexReader;
 import org.apache.giraph.examples.SimpleCombinerVertex;
 import org.apache.giraph.examples.SimpleFailVertex;
 import org.apache.giraph.examples.SimpleMsgVertex;
 import org.apache.giraph.examples.SimplePageRankVertex;
-import 
org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat;
 import org.apache.giraph.examples.SimpleShortestPathsVertex;
-import 
org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat;
 import org.apache.giraph.examples.SimpleSumCombiner;
 import org.apache.giraph.examples.SimpleSuperstepVertex;
-import 
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat;
-import 
org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat;
 import org.apache.giraph.graph.BspUtils;
 import org.apache.giraph.graph.GiraphJob;
 import org.apache.giraph.graph.GraphState;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.graph.VertexInputFormat;
+import org.apache.giraph.graph.BasicVertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -100,15 +100,12 @@ public class TestBspBasic extends BspCas
         GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs =
             new GraphState<LongWritable, IntWritable,
                            FloatWritable, IntWritable>();
-        Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex =
-            BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable>
-            createVertex(job.getConfiguration(), gs);
-        System.out.println("testInstantiateVertex: superstep=" +
-                           vertex.getSuperstep());
-        VertexInputFormat<LongWritable, IntWritable, FloatWritable>
-            inputFormat =
-                BspUtils.<LongWritable, IntWritable, FloatWritable>
-                createVertexInputFormat(job.getConfiguration());
+        BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> 
vertex =
+            BspUtils.createVertex(job.getConfiguration());
+        System.out.println("testInstantiateVertex: Got vertex " + vertex +
+                           ", graphState" + gs);
+        VertexInputFormat<LongWritable, IntWritable, FloatWritable, 
IntWritable>
+            inputFormat = 
BspUtils.createVertexInputFormat(job.getConfiguration());
         List<InputSplit> splitArray =
             inputFormat.getSplits(
                 new JobContext(new Configuration(), new JobID()), 1);
@@ -285,9 +282,11 @@ public class TestBspBasic extends BspCas
             System.out.println("testBspPageRank: maxPageRank=" + maxPageRank +
                                " minPageRank=" + minPageRank +
                                " numVertices=" + numVertices);
-            assertTrue(maxPageRank > 34.030 && maxPageRank < 34.0301);
-            assertTrue(minPageRank > 0.03 && minPageRank < 0.03001);
-            assertTrue(numVertices == 5);
+            assertTrue("34.030 !< " + maxPageRank + " !< " + " 34.0301",
+                maxPageRank > 34.030 && maxPageRank < 34.0301);
+            assertTrue("0.03 !< " + minPageRank + " !< " + "0.03001",
+                minPageRank > 0.03 && minPageRank < 0.03001);
+            assertTrue("numVertices = " + numVertices + " != 5", numVertices 
== 5);
         }
     }
 

Modified: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -30,11 +30,8 @@ import org.mockito.Matchers;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-
 import static 
org.apache.giraph.lib.AdjacencyListTextVertexOutputFormat.AdjacencyListVertexWriter;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -51,7 +48,7 @@ public class TestAdjacencyListTextVertex
     // Create empty iterator == no edges
     when(vertex.iterator()).thenReturn(new ArrayList<Text>().iterator());
 
-    RecordWriter<Text,Text> tw = mock(RecordWriter.class);
+    RecordWriter<Text, Text> tw = mock(RecordWriter.class);
     AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw);
     writer.initialize(tac);
     writer.writeVertex(vertex);

Modified: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -19,8 +19,11 @@ package org.apache.giraph.lib;
 
 
 import junit.framework.TestCase;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.DoubleWritable;
@@ -31,10 +34,11 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
+import java.util.Iterator;
 
+import static 
org.apache.giraph.lib.TestTextDoubleDoubleAdjacencyListVertexInputFormat.assertValidVertex;
+import static 
org.apache.giraph.lib.TestTextDoubleDoubleAdjacencyListVertexInputFormat.setGraphState;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends 
TestCase {
@@ -42,13 +46,16 @@ public class TestLongDoubleDoubleAdjacen
   private RecordReader<LongWritable, Text> rr;
   private Configuration conf;
   private TaskAttemptContext tac;
+  private GraphState<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable> graphState;
 
   public void setUp() throws IOException, InterruptedException {
     rr = mock(RecordReader.class);
     when(rr.nextKeyValue()).thenReturn(true);
     conf = new Configuration();
+    conf.setClass(GiraphJob.VERTEX_CLASS, DummyVertex.class, 
BasicVertex.class);
     conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, LongWritable.class, 
Writable.class);
     conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class, 
Writable.class);
+    graphState = mock(GraphState.class);
     tac = mock(TaskAttemptContext.class);
     when(tac.getConfiguration()).thenReturn(conf);
   }
@@ -57,15 +64,14 @@ public class TestLongDoubleDoubleAdjacen
     String input = "123";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    
LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
 
     try {
-      vr.next(mutableVertex);
+      vr.nextVertex();
+      vr.getCurrentVertex();
       fail("Should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException iae) {
       assertTrue(iae.getMessage().startsWith("Line did not split correctly: 
"));
@@ -80,53 +86,60 @@ public class TestLongDoubleDoubleAdjacen
         new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
-            mutableVertex = mock(MutableVertex.class);
+
     try {
-      vr.next(mutableVertex);
+      vr.nextVertex();
+      vr.getCurrentVertex();
       fail("Should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException iae) {
       assertTrue(iae.getMessage().startsWith("Line did not split correctly: 
"));
     }
   }
 
-  public void testHappyPath() throws IOException, InterruptedException {
+  public void testHappyPath() throws Exception {
     String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    
LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
 
-    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
-    verify(mutableVertex).setVertexId(new LongWritable(42));
-    verify(mutableVertex).setVertexValue(new DoubleWritable(0.1d));
-    verify(mutableVertex).addEdge(new LongWritable(99l), new 
DoubleWritable(0.2d));
-    verify(mutableVertex).addEdge(new LongWritable(2000l), new 
DoubleWritable(0.3d));
-    verify(mutableVertex).addEdge(new LongWritable(4000l), new 
DoubleWritable(0.4d));
-    verifyNoMoreInteractions(mutableVertex);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    BasicVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable>
+        vertex = vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex,
+        new LongWritable(42), new DoubleWritable(0.1),
+        new Edge<LongWritable, DoubleWritable>(new LongWritable(99), new 
DoubleWritable(0.2)),
+        new Edge<LongWritable, DoubleWritable>(new LongWritable(2000), new 
DoubleWritable(0.3)),
+        new Edge<LongWritable, DoubleWritable>(new LongWritable(4000), new 
DoubleWritable(0.4)));
+    assertEquals(vertex.getNumOutEdges(), 3);
   }
 
-  public void testDifferentSeparators() throws IOException, 
InterruptedException {
+  public void testDifferentSeparators() throws Exception {
     String input = "12345:42.42:9999999:99.9";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
     conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
-    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    
LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
-    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
-    verify(mutableVertex).setVertexId(new LongWritable(12345l));
-    verify(mutableVertex).setVertexValue(new DoubleWritable(42.42d));
-    verify(mutableVertex).addEdge(new LongWritable(9999999l),
-        new DoubleWritable(99.9d));
-    verifyNoMoreInteractions(mutableVertex);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    BasicVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable>
+        vertex = vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex, new LongWritable(12345), new 
DoubleWritable(42.42),
+       new Edge<LongWritable, DoubleWritable>(new LongWritable(9999999), new 
DoubleWritable(99.9)));
+    assertEquals(vertex.getNumOutEdges(), 1);
   }
 
+  public static class DummyVertex
+      extends Vertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable> {
+    @Override
+    public void compute(Iterator<BooleanWritable> msgIterator) throws 
IOException {
+      // ignore
+    }
+  }
 }

Modified: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
 (original)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java
 Wed Nov  2 15:40:02 2011
@@ -18,23 +18,33 @@
 package org.apache.giraph.lib;
 
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import junit.framework.TestCase;
+import org.apache.giraph.graph.BasicVertex;
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
 import org.apache.giraph.graph.GiraphJob;
-import org.apache.giraph.graph.MutableVertex;
+import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends 
TestCase {
@@ -42,13 +52,16 @@ public class TestTextDoubleDoubleAdjacen
   private RecordReader<LongWritable, Text> rr;
   private Configuration conf;
   private TaskAttemptContext tac;
+  private GraphState<Text, DoubleWritable, DoubleWritable, BooleanWritable> 
graphState;
 
   public void setUp() throws IOException, InterruptedException {
     rr = mock(RecordReader.class);
     when(rr.nextKeyValue()).thenReturn(true).thenReturn(false);
     conf = new Configuration();
+    conf.setClass(GiraphJob.VERTEX_CLASS, DummyVertex.class, 
BasicVertex.class);
     conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, Text.class, Writable.class);
     conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class, 
Writable.class);
+    graphState = mock(GraphState.class);
     tac = mock(TaskAttemptContext.class);
     when(tac.getConfiguration()).thenReturn(conf);
   }
@@ -57,15 +70,14 @@ public class TestTextDoubleDoubleAdjacen
     String input = "hi";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
 
     try {
-      vr.next(mutableVertex);
+      vr.nextVertex();
+      vr.getCurrentVertex();
       fail("Should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException iae) {
       assertTrue(iae.getMessage().startsWith("Line did not split correctly: 
"));
@@ -76,41 +88,83 @@ public class TestTextDoubleDoubleAdjacen
     String input = "index\t55.66\tindex2";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
-
+    
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
     vr.initialize(null, tac);
-    MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
     try {
-      vr.next(mutableVertex);
+      vr.nextVertex();
+      vr.getCurrentVertex();
       fail("Should have thrown an IllegalArgumentException");
     } catch (IllegalArgumentException iae) {
       assertTrue(iae.getMessage().startsWith("Line did not split correctly: 
"));
     }
   }
 
-  public void testHappyPath() throws IOException, InterruptedException {
+  public static void setGraphState(BasicVertex vertex, GraphState graphState) 
throws Exception {
+    Class<? extends BasicVertex> c = BasicVertex.class;
+    Method m = c.getDeclaredMethod("setGraphState", GraphState.class);
+    m.setAccessible(true);
+    m.invoke(vertex, graphState);
+  }
+
+  public static <I extends WritableComparable, V extends Writable,
+      E extends Writable, M extends Writable> void 
assertValidVertex(Configuration conf,
+      GraphState<I, V, E, M> graphState, BasicVertex<I, V, E, M> actual,
+      I expectedId, V expectedValue, Edge<I, E>... edges)
+      throws Exception {
+    BasicVertex<I, V, E, M> expected = BspUtils.createVertex(conf);
+    setGraphState(expected, graphState);
+
+    // FIXME! maybe can't work if not instantiated properly
+    Map<I, E> edgeMap = Maps.newHashMap();
+    for(Edge<I, E> edge : edges) {
+      edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue());
+    }
+    expected.initialize(expectedId, expectedValue, edgeMap, null);
+    assertValid(expected, actual);
+  }
+
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable, M 
extends Writable> void
+  assertValid(BasicVertex<I, V, E, M> expected, BasicVertex<I, V, E, M> 
actual) {
+    assertEquals(expected.getVertexId(), actual.getVertexId());
+    assertEquals(expected.getVertexValue(), actual.getVertexValue());
+    assertEquals(expected.getNumEdges(), actual.getNumEdges());
+    List<Edge<I, E>> expectedEdges = Lists.newArrayList();
+    List<Edge<I, E>> actualEdges = Lists.newArrayList();
+    for(I actualDestId : actual) {
+      actualEdges.add(new Edge<I, E>(actualDestId, 
actual.getEdgeValue(actualDestId)));
+    }
+    for(I expectedDestId : expected) {
+      expectedEdges.add(new Edge<I, E>(expectedDestId, 
expected.getEdgeValue(expectedDestId)));
+    }
+    Collections.sort(expectedEdges);
+    Collections.sort(actualEdges);
+    for(int i = 0; i < expectedEdges.size(); i++) {
+      assertEquals(expectedEdges.get(i), actualEdges.get(i));
+    }
+  }
+
+  public void testHappyPath() throws Exception {
     String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
-
-    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
-    verify(mutableVertex).setVertexId(new Text("Hi"));
-    verify(mutableVertex).setVertexValue(new DoubleWritable(0));
-    verify(mutableVertex).addEdge(new Text("Ciao"), new 
DoubleWritable(1.123d));
-    verify(mutableVertex).addEdge(new Text("Bomdia"), new 
DoubleWritable(2.234d));
-    verify(mutableVertex).addEdge(new Text("Ola"), new DoubleWritable(3.345d));
-    verifyNoMoreInteractions(mutableVertex);
+    assertTrue("Should have been able to add a vertex", vr.nextVertex());
+    BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
+        vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex, new Text("Hi"), new 
DoubleWritable(0),
+        new Edge<Text, DoubleWritable>(new Text("Ciao"), new 
DoubleWritable(1.123d)),
+        new Edge<Text, DoubleWritable>(new Text("Bomdia"), new 
DoubleWritable(2.234d)),
+        new Edge<Text, DoubleWritable>(new Text("Ola"), new 
DoubleWritable(3.345d)));
+    assertEquals(vertex.getNumOutEdges(), 3);
   }
 
-  public void testLineSanitizer() throws IOException, InterruptedException {
+  public void testLineSanitizer() throws Exception {
     String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001";
 
     AdjacencyListVertexReader.LineSanitizer toUpper =
@@ -122,37 +176,46 @@ public class TestTextDoubleDoubleAdjacen
     };
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr, 
toUpper);
+    
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr,
 toUpper);
 
     vr.initialize(null, tac);
-    MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable>
-            mutableVertex = mock(MutableVertex.class);
-    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
-    verify(mutableVertex).setVertexId(new Text("BYE"));
-    verify(mutableVertex).setVertexValue(new DoubleWritable(0.01d));
-    verify(mutableVertex).addEdge(new Text("CIAO"), new 
DoubleWritable(1.001d));
-    verify(mutableVertex).addEdge(new Text("TCHAU"), new 
DoubleWritable(2.0001d));
-    verify(mutableVertex).addEdge(new Text("ADIOS"), new 
DoubleWritable(3.00001d));
-    verifyNoMoreInteractions(mutableVertex);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
+        vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex,
+        new Text("BYE"), new DoubleWritable(0.01d),
+        new Edge<Text, DoubleWritable>(new Text("CIAO"), new 
DoubleWritable(1.001d)),
+        new Edge<Text, DoubleWritable>(new Text("TCHAU"), new 
DoubleWritable(2.0001d)),
+        new Edge<Text, DoubleWritable>(new Text("ADIOS"), new 
DoubleWritable(3.00001d)));
+
+    assertEquals(vertex.getNumOutEdges(), 3);
   }
 
-  public void testDifferentSeparators() throws IOException, 
InterruptedException {
+  public void testDifferentSeparators() throws Exception {
     String input = "alpha:42:beta:99";
 
     when(rr.getCurrentValue()).thenReturn(new Text(input));
     conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
-    TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
-        new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+    
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr 
=
+        new 
TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr);
 
     vr.initialize(null, tac);
-    MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable>
-        mutableVertex = mock(MutableVertex.class);
-    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
-    verify(mutableVertex).setVertexId(new Text("alpha"));
-    verify(mutableVertex).setVertexValue(new DoubleWritable(42));
-    verify(mutableVertex).addEdge(new Text("beta"), new DoubleWritable(99));
-    verifyNoMoreInteractions(mutableVertex);
+    assertTrue("Should have been able to read vertex", vr.nextVertex());
+    BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex =
+        vr.getCurrentVertex();
+    setGraphState(vertex, graphState);
+    assertValidVertex(conf, graphState, vertex, new Text("alpha"), new 
DoubleWritable(42d),
+        new Edge<Text, DoubleWritable>(new Text("beta"), new 
DoubleWritable(99d)));
+    assertEquals(vertex.getNumOutEdges(), 1);
   }
 
+  public static class DummyVertex
+      extends Vertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> {
+    @Override
+    public void compute(Iterator<BooleanWritable> msgIterator) throws 
IOException {
+      // ignore
+    }
+  }
 }


Reply via email to