Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexRange.java Wed Nov 2 15:40:02 2011 @@ -60,8 +60,8 @@ public class VertexRange<I extends Writa /** Checkpoint file prefix (null if not recovering from a checkpoint) */ private String checkpointfilePrefix = null; /** Vertex map for this range (keyed by index) */ - private final SortedMap<I, Vertex<I, V, E, M>> vertexMap = - new TreeMap<I, Vertex<I, V, E, M>>(); + private final SortedMap<I, BasicVertex<I, V, E, M>> vertexMap = + new TreeMap<I, BasicVertex<I, V, E, M>>(); /** Class logger */ private static final Logger LOG = Logger.getLogger(VertexRange.class); @@ -199,7 +199,7 @@ public class VertexRange<I extends Writa * * @return Map of vertices (keyed by index) */ - public SortedMap<I, Vertex<I, V, E, M>> getVertexMap() { + public SortedMap<I, BasicVertex<I, V, E, M>> getVertexMap() { return vertexMap; }
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexReader.java Wed Nov 2 15:40:02 2011 @@ -18,18 +18,19 @@ package org.apache.giraph.graph; -import java.io.IOException; - import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import java.io.IOException; + @SuppressWarnings("rawtypes") public interface VertexReader< I extends WritableComparable, V extends Writable, - E extends Writable> { + E extends Writable, + M extends Writable> { /** * Use the input split and context to setup reading the vertices. * Guaranteed to be called prior to any other function. @@ -43,14 +44,20 @@ public interface VertexReader< throws IOException, InterruptedException; /** - * Reads the next vertex and associated data * - * @param vertex set the properties of this vertex - * @return true iff a vertex and associated data was read, false if at EOF + * @return false iff there are no more vertices + * @throws IOException * @throws InterruptedException */ - boolean next(MutableVertex<I, V, E, ?> vertex) - throws IOException, InterruptedException; + boolean nextVertex() throws IOException, InterruptedException; + + /** + * + * @return the current vertex which has been read. nextVertex() should be called first. + * @throws IOException + * @throws InterruptedException + */ + BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException; /** * Close this {@link VertexReader} to future operations. Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/VertexResolver.java Wed Nov 2 15:40:02 2011 @@ -111,8 +111,10 @@ public class VertexResolver<I extends Wr } @Override - public MutableVertex<I, V, E, M> instantiateVertex() { - return BspUtils.<I, V, E, M>createVertex(getConf(), graphState); + public BasicVertex<I, V, E, M> instantiateVertex() { + BasicVertex<I, V, E, M> vertex = BspUtils.<I,V,E,M>createVertex(getConf()); + vertex.setGraphState(graphState); + return vertex; } @Override Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListTextVertexOutputFormat.java Wed Nov 2 15:40:02 2011 @@ -37,6 +37,7 @@ import java.io.IOException; * @param <V> Vertex value * @param <E> Edge value */ +@SuppressWarnings("rawtypes") public class AdjacencyListTextVertexOutputFormat <I extends WritableComparable, V extends Writable, E extends Writable> extends TextVertexOutputFormat<I, V, E>{ @@ -54,7 +55,7 @@ public class AdjacencyListTextVertexOutp @Override public void writeVertex(BasicVertex<I, V, E, ?> vertex) throws IOException, InterruptedException { - if(delimiter == null) { + if (delimiter == null) { delimiter = getContext().getConfiguration() .get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT); } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java Wed Nov 2 15:40:02 2011 @@ -17,9 +17,10 @@ */ package org.apache.giraph.lib; +import com.google.common.collect.Maps; +import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.MutableVertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -28,6 +29,7 @@ import org.apache.hadoop.io.WritableComp import org.apache.hadoop.mapreduce.RecordReader; import java.io.IOException; +import java.util.Map; /** * VertexReader that readers lines of text with vertices encoded as adjacency @@ -41,9 +43,10 @@ import java.io.IOException; * @param <V> Vertex value * @param <E> Edge value */ +@SuppressWarnings("rawtypes") abstract class AdjacencyListVertexReader<I extends WritableComparable, - V extends Writable, E extends Writable> extends - TextVertexInputFormat.TextVertexReader<I, V, E> { + V extends Writable, E extends Writable, M extends Writable> extends + TextVertexInputFormat.TextVertexReader<I, V, E, M> { public static final String LINE_TOKENIZE_VALUE = "adj.list.input.delimiter"; public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t"; @@ -94,18 +97,17 @@ abstract class AdjacencyListVertexReader */ abstract public void decodeEdge(String id, String value, Edge<I, E> edge); - /** - * {@inheritDoc} - */ + @Override - public boolean next(MutableVertex<I, V, E, ?> iveMutableVertex) - throws IOException, InterruptedException { - if (!getRecordReader().nextKeyValue()) { - return false; - } + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + @Override + public BasicVertex<I, V, E, M> getCurrentVertex() throws IOException, InterruptedException { Configuration conf = getContext().getConfiguration(); String line = getRecordReader().getCurrentValue().toString(); + BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf); if (sanitizer != null) { line = sanitizer.sanitize(line); @@ -123,20 +125,19 @@ abstract class AdjacencyListVertexReader I vertexId = BspUtils.<I>createVertexIndex(conf); decodeId(values[0], vertexId); - iveMutableVertex.setVertexId(vertexId); V value = BspUtils.<V>createVertexValue(conf); decodeValue(values[1], value); - iveMutableVertex.setVertexValue(value); int i = 2; + Map<I, E> edges = Maps.newHashMap(); Edge<I, E> edge = new Edge<I, E>(); while(i < values.length) { decodeEdge(values[i], values[i + 1], edge); - iveMutableVertex.addEdge(edge.getDestVertexId(), edge.getEdgeValue()); + edges.put(edge.getDestVertexId(), edge.getEdgeValue()); i += 2; } - - return true; + vertex.initialize(vertexId, value, edges, null); + return vertex; } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/JsonBase64VertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -18,10 +18,11 @@ package org.apache.giraph.lib; +import com.google.common.collect.Maps; import net.iharder.Base64; +import org.apache.giraph.graph.BasicVertex; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.Edge; -import org.apache.giraph.graph.MutableVertex; import org.apache.giraph.graph.VertexReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; @@ -39,6 +40,7 @@ import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; +import java.util.Map; /** * Simple way to represent the structure of the graph with a JSON object. @@ -52,8 +54,9 @@ import java.io.IOException; */ @SuppressWarnings("rawtypes") public class JsonBase64VertexInputFormat< - I extends WritableComparable, V extends Writable, E extends Writable> - extends TextVertexInputFormat<I, V, E> implements + I extends WritableComparable, V extends Writable, E extends Writable, + M extends Writable> + extends TextVertexInputFormat<I, V, E, M> implements JsonBase64VertexFormat { /** * Simple reader that supports {@link JsonBase64VertexInputFormat} @@ -64,25 +67,27 @@ public class JsonBase64VertexInputFormat */ private static class JsonBase64VertexReader< I extends WritableComparable, V extends Writable, - E extends Writable> extends TextVertexReader<I, V, E> { + E extends Writable, M extends Writable> extends TextVertexReader<I, V, E, M> { /** * Only constructor. Requires the LineRecordReader * * @param lineRecordReader Line record reader to read from */ - public JsonBase64VertexReader( - RecordReader<LongWritable, Text> lineRecordReader) { + public JsonBase64VertexReader(RecordReader<LongWritable, Text> lineRecordReader) { super(lineRecordReader); } @Override - public boolean next(MutableVertex<I, V, E, ?> vertex) - throws IOException, InterruptedException { - if (!getRecordReader().nextKeyValue()) { - return false; - } + public boolean nextVertex() throws IOException, InterruptedException { + return getRecordReader().nextKeyValue(); + } + @Override + public BasicVertex<I, V, E, M> getCurrentVertex() + throws IOException, InterruptedException { Configuration conf = getContext().getConfiguration(); + BasicVertex<I, V, E, M> vertex = BspUtils.createVertex(conf); + Text line = getRecordReader().getCurrentValue(); JSONObject vertexObject; try { @@ -93,26 +98,26 @@ public class JsonBase64VertexInputFormat } DataInput input = null; byte[] decodedWritable = null; + I vertexId = null; try { decodedWritable = Base64.decode( vertexObject.getString(VERTEX_ID_KEY)); input = new DataInputStream( new ByteArrayInputStream(decodedWritable)); - I vertexId = BspUtils.<I>createVertexIndex(conf); + vertexId = BspUtils.<I>createVertexIndex(conf); vertexId.readFields(input); - vertex.setVertexId(vertexId); } catch (JSONException e) { throw new IllegalArgumentException( "next: Failed to get vertex id", e); } + V vertexValue = null; try { decodedWritable = Base64.decode( vertexObject.getString(VERTEX_VALUE_KEY)); input = new DataInputStream( new ByteArrayInputStream(decodedWritable)); - V vertexValue = BspUtils.<V>createVertexValue(conf); + vertexValue = BspUtils.<V>createVertexValue(conf); vertexValue.readFields(input); - vertex.setVertexValue(vertexValue); } catch (JSONException e) { throw new IllegalArgumentException( "next: Failed to get vertex value", e); @@ -124,6 +129,7 @@ public class JsonBase64VertexInputFormat throw new IllegalArgumentException( "next: Failed to get edge array", e); } + Map<I, E> edgeMap = Maps.newHashMap(); for (int i = 0; i < edgeArray.length(); ++i) { try { decodedWritable = @@ -137,17 +143,18 @@ public class JsonBase64VertexInputFormat Edge<I, E> edge = new Edge<I, E>(); edge.setConf(getContext().getConfiguration()); edge.readFields(input); - vertex.addEdge(edge.getDestVertexId(), edge.getEdgeValue()); + edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue()); } - return true; + vertex.initialize(vertexId, vertexValue, edgeMap, null); + return vertex; } } @Override - public VertexReader<I, V, E> createVertexReader( + public VertexReader<I, V, E, M> createVertexReader( InputSplit split, TaskAttemptContext context) throws IOException { - return new JsonBase64VertexReader<I, V, E>( - textInputFormat.createRecordReader(split, context)); + return new JsonBase64VertexReader<I, V, E, M>(textInputFormat.createRecordReader(split, + context)); } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -21,6 +21,7 @@ import org.apache.giraph.graph.Edge; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -35,11 +36,11 @@ import java.io.IOException; * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99, * with values of 0.3 and 0.44, respectively. */ -public class LongDoubleDoubleAdjacencyListVertexInputFormat extends - TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> { +public class LongDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable> extends + TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable, M> { - static class VertexReader extends - AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable> { + static class VertexReader<M extends Writable> extends + AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable, M> { VertexReader(RecordReader<LongWritable, Text> lineRecordReader) { super(lineRecordReader); @@ -69,8 +70,11 @@ public class LongDoubleDoubleAdjacencyLi } @Override - public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit split, + public org.apache.giraph.graph.VertexReader<LongWritable, + DoubleWritable, DoubleWritable, M> createVertexReader( + InputSplit split, TaskAttemptContext context) throws IOException { - return new VertexReader(textInputFormat.createRecordReader(split, context)); + return new VertexReader<M>(textInputFormat.createRecordReader( + split, context)); } } Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java?rev=1196639&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/SequenceFileVertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -0,0 +1,72 @@ +package org.apache.giraph.lib; + +import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.VertexReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + +import java.io.IOException; +import java.util.List; + +public class SequenceFileVertexInputFormat<I extends WritableComparable<I>, + V extends Writable, + E extends Writable, + M extends Writable, + X extends BasicVertex<I, V, E, M>> + extends VertexInputFormat<I, V, E, M> { + protected SequenceFileInputFormat<I, X> sequenceFileInputFormat + = new SequenceFileInputFormat<I, X>(); + + @Override public List<InputSplit> getSplits(JobContext context, int numWorkers) + throws IOException, InterruptedException { + return sequenceFileInputFormat.getSplits(context); + } + + @Override + public VertexReader<I, V, E, M> createVertexReader(InputSplit split, + TaskAttemptContext context) + throws IOException { + return new SequenceFileVertexReader<I, V, E, M, X>( + sequenceFileInputFormat.createRecordReader(split, context)); + } + + public static class SequenceFileVertexReader<I extends WritableComparable<I>, + V extends Writable, E extends Writable, M extends Writable, + X extends BasicVertex<I, V, E, M>> + implements VertexReader<I, V, E, M> { + private final RecordReader<I, X> recordReader; + + public SequenceFileVertexReader(RecordReader<I, X> recordReader) { + this.recordReader = recordReader; + } + + @Override public void initialize(InputSplit inputSplit, TaskAttemptContext context) + throws IOException, InterruptedException { + recordReader.initialize(inputSplit, context); + } + + @Override public boolean nextVertex() throws IOException, InterruptedException { + return recordReader.nextKeyValue(); + } + + @Override public BasicVertex<I, V, E, M> getCurrentVertex() + throws IOException, InterruptedException { + return recordReader.getCurrentValue(); + } + + + @Override public void close() throws IOException { + recordReader.close(); + } + + @Override public float getProgress() throws IOException, InterruptedException { + return recordReader.getProgress(); + } + } +} Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextDoubleDoubleAdjacencyListVertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -21,6 +21,7 @@ import org.apache.giraph.graph.Edge; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -32,11 +33,11 @@ import java.io.IOException; * Strings and values as doubles. This is a good inputformat for reading * graphs where the id types do not matter and can be stashed in a String. */ -public class TextDoubleDoubleAdjacencyListVertexInputFormat - extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable> { +public class TextDoubleDoubleAdjacencyListVertexInputFormat<M extends Writable> + extends TextVertexInputFormat<Text, DoubleWritable, DoubleWritable, M> { - static class VertexReader extends AdjacencyListVertexReader<Text, - DoubleWritable, DoubleWritable> { + static class VertexReader<M extends Writable> extends AdjacencyListVertexReader<Text, + DoubleWritable, DoubleWritable, M> { VertexReader(RecordReader<LongWritable, Text> lineRecordReader) { super(lineRecordReader); @@ -66,9 +67,12 @@ public class TextDoubleDoubleAdjacencyLi } @Override - public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit split, - TaskAttemptContext context) throws IOException { - return new VertexReader(textInputFormat.createRecordReader(split, context)); + public org.apache.giraph.graph.VertexReader<Text, DoubleWritable, + DoubleWritable, M> createVertexReader( + InputSplit split, + TaskAttemptContext context) throws IOException { + return new VertexReader<M>(textInputFormat.createRecordReader( + split, context)); } } Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/TextVertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -18,9 +18,6 @@ package org.apache.giraph.lib; -import java.io.IOException; -import java.util.List; - import org.apache.giraph.graph.VertexInputFormat; import org.apache.giraph.graph.VertexReader; import org.apache.hadoop.io.LongWritable; @@ -33,6 +30,9 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import java.io.IOException; +import java.util.List; + /** * Abstract class that users should subclass to use their own text based * vertex output format. @@ -43,8 +43,11 @@ import org.apache.hadoop.mapreduce.lib.i */ @SuppressWarnings("rawtypes") public abstract class TextVertexInputFormat< - I extends WritableComparable, V extends Writable, E extends Writable> - extends VertexInputFormat<I, V, E> { + I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> + extends VertexInputFormat<I, V, E, M> { /** Uses the TextInputFormat to do everything */ protected TextInputFormat textInputFormat = new TextInputFormat(); @@ -58,8 +61,8 @@ public abstract class TextVertexInputFor * @param <E> Edge value */ public static abstract class TextVertexReader<I extends WritableComparable, - V extends Writable, E extends Writable> - implements VertexReader<I, V, E> { + V extends Writable, E extends Writable, M extends Writable> + implements VertexReader<I, V, E, M> { /** Internal line record reader */ private final RecordReader<LongWritable, Text> lineRecordReader; /** Context passed to initialize */ Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java (original) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java Wed Nov 2 15:40:02 2011 @@ -388,7 +388,6 @@ public class ZooKeeperManager { */ private void getZooKeeperServerList() throws IOException, InterruptedException { - int serverListFileAttempt = 0; String serverListFile = null; if (taskPartition == 0) { @@ -409,7 +408,6 @@ public class ZooKeeperManager { if (serverListFile != null) { break; } - ++serverListFileAttempt; try { Thread.sleep(pollMsecs); } catch (InterruptedException e) { Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/TestBspBasic.java Wed Nov 2 15:40:02 2011 @@ -20,23 +20,23 @@ package org.apache.giraph; import junit.framework.Test; import junit.framework.TestSuite; +import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat; +import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat; +import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; +import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; import org.apache.giraph.examples.GeneratedVertexReader; import org.apache.giraph.examples.SimpleCombinerVertex; import org.apache.giraph.examples.SimpleFailVertex; import org.apache.giraph.examples.SimpleMsgVertex; import org.apache.giraph.examples.SimplePageRankVertex; -import org.apache.giraph.examples.SimplePageRankVertex.SimplePageRankVertexInputFormat; import org.apache.giraph.examples.SimpleShortestPathsVertex; -import org.apache.giraph.examples.SimpleShortestPathsVertex.SimpleShortestPathsVertexOutputFormat; import org.apache.giraph.examples.SimpleSumCombiner; import org.apache.giraph.examples.SimpleSuperstepVertex; -import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexInputFormat; -import org.apache.giraph.examples.SimpleSuperstepVertex.SimpleSuperstepVertexOutputFormat; import org.apache.giraph.graph.BspUtils; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.GraphState; -import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexInputFormat; +import org.apache.giraph.graph.BasicVertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -100,15 +100,12 @@ public class TestBspBasic extends BspCas GraphState<LongWritable, IntWritable, FloatWritable, IntWritable> gs = new GraphState<LongWritable, IntWritable, FloatWritable, IntWritable>(); - Vertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex = - BspUtils.<LongWritable, IntWritable, FloatWritable, IntWritable> - createVertex(job.getConfiguration(), gs); - System.out.println("testInstantiateVertex: superstep=" + - vertex.getSuperstep()); - VertexInputFormat<LongWritable, IntWritable, FloatWritable> - inputFormat = - BspUtils.<LongWritable, IntWritable, FloatWritable> - createVertexInputFormat(job.getConfiguration()); + BasicVertex<LongWritable, IntWritable, FloatWritable, IntWritable> vertex = + BspUtils.createVertex(job.getConfiguration()); + System.out.println("testInstantiateVertex: Got vertex " + vertex + + ", graphState" + gs); + VertexInputFormat<LongWritable, IntWritable, FloatWritable, IntWritable> + inputFormat = BspUtils.createVertexInputFormat(job.getConfiguration()); List<InputSplit> splitArray = inputFormat.getSplits( new JobContext(new Configuration(), new JobID()), 1); @@ -285,9 +282,11 @@ public class TestBspBasic extends BspCas System.out.println("testBspPageRank: maxPageRank=" + maxPageRank + " minPageRank=" + minPageRank + " numVertices=" + numVertices); - assertTrue(maxPageRank > 34.030 && maxPageRank < 34.0301); - assertTrue(minPageRank > 0.03 && minPageRank < 0.03001); - assertTrue(numVertices == 5); + assertTrue("34.030 !< " + maxPageRank + " !< " + " 34.0301", + maxPageRank > 34.030 && maxPageRank < 34.0301); + assertTrue("0.03 !< " + minPageRank + " !< " + "0.03001", + minPageRank > 0.03 && minPageRank < 0.03001); + assertTrue("numVertices = " + numVertices + " != 5", numVertices == 5); } } Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestAdjacencyListTextVertexOutputFormat.java Wed Nov 2 15:40:02 2011 @@ -30,11 +30,8 @@ import org.mockito.Matchers; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; - import static org.apache.giraph.lib.AdjacencyListTextVertexOutputFormat.AdjacencyListVertexWriter; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -51,7 +48,7 @@ public class TestAdjacencyListTextVertex // Create empty iterator == no edges when(vertex.iterator()).thenReturn(new ArrayList<Text>().iterator()); - RecordWriter<Text,Text> tw = mock(RecordWriter.class); + RecordWriter<Text, Text> tw = mock(RecordWriter.class); AdjacencyListVertexWriter writer = new AdjacencyListVertexWriter(tw); writer.initialize(tac); writer.writeVertex(vertex); Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -19,8 +19,11 @@ package org.apache.giraph.lib; import junit.framework.TestCase; +import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.GiraphJob; -import org.apache.giraph.graph.MutableVertex; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.DoubleWritable; @@ -31,10 +34,11 @@ import org.apache.hadoop.mapreduce.Recor import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.util.Iterator; +import static org.apache.giraph.lib.TestTextDoubleDoubleAdjacencyListVertexInputFormat.assertValidVertex; +import static org.apache.giraph.lib.TestTextDoubleDoubleAdjacencyListVertexInputFormat.setGraphState; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends TestCase { @@ -42,13 +46,16 @@ public class TestLongDoubleDoubleAdjacen private RecordReader<LongWritable, Text> rr; private Configuration conf; private TaskAttemptContext tac; + private GraphState<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> graphState; public void setUp() throws IOException, InterruptedException { rr = mock(RecordReader.class); when(rr.nextKeyValue()).thenReturn(true); conf = new Configuration(); + conf.setClass(GiraphJob.VERTEX_CLASS, DummyVertex.class, BasicVertex.class); conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, LongWritable.class, Writable.class); conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class); + graphState = mock(GraphState.class); tac = mock(TaskAttemptContext.class); when(tac.getConfiguration()).thenReturn(conf); } @@ -57,15 +64,14 @@ public class TestLongDoubleDoubleAdjacen String input = "123"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); try { - vr.next(mutableVertex); + vr.nextVertex(); + vr.getCurrentVertex(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException iae) { assertTrue(iae.getMessage().startsWith("Line did not split correctly: ")); @@ -80,53 +86,60 @@ public class TestLongDoubleDoubleAdjacen new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); vr.initialize(null, tac); - MutableVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); + try { - vr.next(mutableVertex); + vr.nextVertex(); + vr.getCurrentVertex(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException iae) { assertTrue(iae.getMessage().startsWith("Line did not split correctly: ")); } } - public void testHappyPath() throws IOException, InterruptedException { + public void testHappyPath() throws Exception { String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); - assertTrue("Should have been able to read vertex", vr.next(mutableVertex)); - verify(mutableVertex).setVertexId(new LongWritable(42)); - verify(mutableVertex).setVertexValue(new DoubleWritable(0.1d)); - verify(mutableVertex).addEdge(new LongWritable(99l), new DoubleWritable(0.2d)); - verify(mutableVertex).addEdge(new LongWritable(2000l), new DoubleWritable(0.3d)); - verify(mutableVertex).addEdge(new LongWritable(4000l), new DoubleWritable(0.4d)); - verifyNoMoreInteractions(mutableVertex); + assertTrue("Should have been able to read vertex", vr.nextVertex()); + BasicVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> + vertex = vr.getCurrentVertex(); + setGraphState(vertex, graphState); + assertValidVertex(conf, graphState, vertex, + new LongWritable(42), new DoubleWritable(0.1), + new Edge<LongWritable, DoubleWritable>(new LongWritable(99), new DoubleWritable(0.2)), + new Edge<LongWritable, DoubleWritable>(new LongWritable(2000), new DoubleWritable(0.3)), + new Edge<LongWritable, DoubleWritable>(new LongWritable(4000), new DoubleWritable(0.4))); + assertEquals(vertex.getNumOutEdges(), 3); } - public void testDifferentSeparators() throws IOException, InterruptedException { + public void testDifferentSeparators() throws Exception { String input = "12345:42.42:9999999:99.9"; when(rr.getCurrentValue()).thenReturn(new Text(input)); conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":"); - LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); - assertTrue("Should have been able to read vertex", vr.next(mutableVertex)); - verify(mutableVertex).setVertexId(new LongWritable(12345l)); - verify(mutableVertex).setVertexValue(new DoubleWritable(42.42d)); - verify(mutableVertex).addEdge(new LongWritable(9999999l), - new DoubleWritable(99.9d)); - verifyNoMoreInteractions(mutableVertex); + assertTrue("Should have been able to read vertex", vr.nextVertex()); + BasicVertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> + vertex = vr.getCurrentVertex(); + setGraphState(vertex, graphState); + assertValidVertex(conf, graphState, vertex, new LongWritable(12345), new DoubleWritable(42.42), + new Edge<LongWritable, DoubleWritable>(new LongWritable(9999999), new DoubleWritable(99.9))); + assertEquals(vertex.getNumOutEdges(), 1); } + public static class DummyVertex + extends Vertex<LongWritable, DoubleWritable, DoubleWritable, BooleanWritable> { + @Override + public void compute(Iterator<BooleanWritable> msgIterator) throws IOException { + // ignore + } + } } Modified: incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1196639&r1=1196638&r2=1196639&view=diff ============================================================================== --- incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java (original) +++ incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestTextDoubleDoubleAdjacencyListVertexInputFormat.java Wed Nov 2 15:40:02 2011 @@ -18,23 +18,33 @@ package org.apache.giraph.lib; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import junit.framework.TestCase; +import org.apache.giraph.graph.BasicVertex; +import org.apache.giraph.graph.BspUtils; +import org.apache.giraph.graph.Edge; import org.apache.giraph.graph.GiraphJob; -import org.apache.giraph.graph.MutableVertex; +import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.Vertex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; public class TestTextDoubleDoubleAdjacencyListVertexInputFormat extends TestCase { @@ -42,13 +52,16 @@ public class TestTextDoubleDoubleAdjacen private RecordReader<LongWritable, Text> rr; private Configuration conf; private TaskAttemptContext tac; + private GraphState<Text, DoubleWritable, DoubleWritable, BooleanWritable> graphState; public void setUp() throws IOException, InterruptedException { rr = mock(RecordReader.class); when(rr.nextKeyValue()).thenReturn(true).thenReturn(false); conf = new Configuration(); + conf.setClass(GiraphJob.VERTEX_CLASS, DummyVertex.class, BasicVertex.class); conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, Text.class, Writable.class); conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class, Writable.class); + graphState = mock(GraphState.class); tac = mock(TaskAttemptContext.class); when(tac.getConfiguration()).thenReturn(conf); } @@ -57,15 +70,14 @@ public class TestTextDoubleDoubleAdjacen String input = "hi"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); try { - vr.next(mutableVertex); + vr.nextVertex(); + vr.getCurrentVertex(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException iae) { assertTrue(iae.getMessage().startsWith("Line did not split correctly: ")); @@ -76,41 +88,83 @@ public class TestTextDoubleDoubleAdjacen String input = "index\t55.66\tindex2"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); - + TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); try { - vr.next(mutableVertex); + vr.nextVertex(); + vr.getCurrentVertex(); fail("Should have thrown an IllegalArgumentException"); } catch (IllegalArgumentException iae) { assertTrue(iae.getMessage().startsWith("Line did not split correctly: ")); } } - public void testHappyPath() throws IOException, InterruptedException { + public static void setGraphState(BasicVertex vertex, GraphState graphState) throws Exception { + Class<? extends BasicVertex> c = BasicVertex.class; + Method m = c.getDeclaredMethod("setGraphState", GraphState.class); + m.setAccessible(true); + m.invoke(vertex, graphState); + } + + public static <I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> void assertValidVertex(Configuration conf, + GraphState<I, V, E, M> graphState, BasicVertex<I, V, E, M> actual, + I expectedId, V expectedValue, Edge<I, E>... edges) + throws Exception { + BasicVertex<I, V, E, M> expected = BspUtils.createVertex(conf); + setGraphState(expected, graphState); + + // FIXME! maybe can't work if not instantiated properly + Map<I, E> edgeMap = Maps.newHashMap(); + for(Edge<I, E> edge : edges) { + edgeMap.put(edge.getDestVertexId(), edge.getEdgeValue()); + } + expected.initialize(expectedId, expectedValue, edgeMap, null); + assertValid(expected, actual); + } + + public static + <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> void + assertValid(BasicVertex<I, V, E, M> expected, BasicVertex<I, V, E, M> actual) { + assertEquals(expected.getVertexId(), actual.getVertexId()); + assertEquals(expected.getVertexValue(), actual.getVertexValue()); + assertEquals(expected.getNumEdges(), actual.getNumEdges()); + List<Edge<I, E>> expectedEdges = Lists.newArrayList(); + List<Edge<I, E>> actualEdges = Lists.newArrayList(); + for(I actualDestId : actual) { + actualEdges.add(new Edge<I, E>(actualDestId, actual.getEdgeValue(actualDestId))); + } + for(I expectedDestId : expected) { + expectedEdges.add(new Edge<I, E>(expectedDestId, expected.getEdgeValue(expectedDestId))); + } + Collections.sort(expectedEdges); + Collections.sort(actualEdges); + for(int i = 0; i < expectedEdges.size(); i++) { + assertEquals(expectedEdges.get(i), actualEdges.get(i)); + } + } + + public void testHappyPath() throws Exception { String input = "Hi\t0\tCiao\t1.123\tBomdia\t2.234\tOla\t3.345"; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); - - assertTrue("Should have been able to read vertex", vr.next(mutableVertex)); - verify(mutableVertex).setVertexId(new Text("Hi")); - verify(mutableVertex).setVertexValue(new DoubleWritable(0)); - verify(mutableVertex).addEdge(new Text("Ciao"), new DoubleWritable(1.123d)); - verify(mutableVertex).addEdge(new Text("Bomdia"), new DoubleWritable(2.234d)); - verify(mutableVertex).addEdge(new Text("Ola"), new DoubleWritable(3.345d)); - verifyNoMoreInteractions(mutableVertex); + assertTrue("Should have been able to add a vertex", vr.nextVertex()); + BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex = + vr.getCurrentVertex(); + setGraphState(vertex, graphState); + assertValidVertex(conf, graphState, vertex, new Text("Hi"), new DoubleWritable(0), + new Edge<Text, DoubleWritable>(new Text("Ciao"), new DoubleWritable(1.123d)), + new Edge<Text, DoubleWritable>(new Text("Bomdia"), new DoubleWritable(2.234d)), + new Edge<Text, DoubleWritable>(new Text("Ola"), new DoubleWritable(3.345d))); + assertEquals(vertex.getNumOutEdges(), 3); } - public void testLineSanitizer() throws IOException, InterruptedException { + public void testLineSanitizer() throws Exception { String input = "Bye\t0.01\tCiao\t1.001\tTchau\t2.0001\tAdios\t3.00001"; AdjacencyListVertexReader.LineSanitizer toUpper = @@ -122,37 +176,46 @@ public class TestTextDoubleDoubleAdjacen }; when(rr.getCurrentValue()).thenReturn(new Text(input)); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr, toUpper); + TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr, toUpper); vr.initialize(null, tac); - MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); - assertTrue("Should have been able to read vertex", vr.next(mutableVertex)); - verify(mutableVertex).setVertexId(new Text("BYE")); - verify(mutableVertex).setVertexValue(new DoubleWritable(0.01d)); - verify(mutableVertex).addEdge(new Text("CIAO"), new DoubleWritable(1.001d)); - verify(mutableVertex).addEdge(new Text("TCHAU"), new DoubleWritable(2.0001d)); - verify(mutableVertex).addEdge(new Text("ADIOS"), new DoubleWritable(3.00001d)); - verifyNoMoreInteractions(mutableVertex); + assertTrue("Should have been able to read vertex", vr.nextVertex()); + BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex = + vr.getCurrentVertex(); + setGraphState(vertex, graphState); + assertValidVertex(conf, graphState, vertex, + new Text("BYE"), new DoubleWritable(0.01d), + new Edge<Text, DoubleWritable>(new Text("CIAO"), new DoubleWritable(1.001d)), + new Edge<Text, DoubleWritable>(new Text("TCHAU"), new DoubleWritable(2.0001d)), + new Edge<Text, DoubleWritable>(new Text("ADIOS"), new DoubleWritable(3.00001d))); + + assertEquals(vertex.getNumOutEdges(), 3); } - public void testDifferentSeparators() throws IOException, InterruptedException { + public void testDifferentSeparators() throws Exception { String input = "alpha:42:beta:99"; when(rr.getCurrentValue()).thenReturn(new Text(input)); conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":"); - TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr = - new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr); + TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable> vr = + new TextDoubleDoubleAdjacencyListVertexInputFormat.VertexReader<BooleanWritable>(rr); vr.initialize(null, tac); - MutableVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> - mutableVertex = mock(MutableVertex.class); - assertTrue("Should have been able to read vertex", vr.next(mutableVertex)); - verify(mutableVertex).setVertexId(new Text("alpha")); - verify(mutableVertex).setVertexValue(new DoubleWritable(42)); - verify(mutableVertex).addEdge(new Text("beta"), new DoubleWritable(99)); - verifyNoMoreInteractions(mutableVertex); + assertTrue("Should have been able to read vertex", vr.nextVertex()); + BasicVertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> vertex = + vr.getCurrentVertex(); + setGraphState(vertex, graphState); + assertValidVertex(conf, graphState, vertex, new Text("alpha"), new DoubleWritable(42d), + new Edge<Text, DoubleWritable>(new Text("beta"), new DoubleWritable(99d))); + assertEquals(vertex.getNumOutEdges(), 1); } + public static class DummyVertex + extends Vertex<Text, DoubleWritable, DoubleWritable, BooleanWritable> { + @Override + public void compute(Iterator<BooleanWritable> msgIterator) throws IOException { + // ignore + } + } }
