Author: jghoman
Date: Wed Oct 26 19:23:23 2011
New Revision: 1189402

URL: http://svn.apache.org/viewvc?rev=1189402&view=rev
Log:
GIRAPH-62. Provide input format for reading graphs stored as adjacency lists.

Added:
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
    incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/
    
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
Modified:
    incubator/giraph/trunk/CHANGELOG
    incubator/giraph/trunk/pom.xml
    incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1189402&r1=1189401&r2=1189402&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Oct 26 19:23:23 2011
@@ -2,6 +2,9 @@ Giraph Change Log
 
 Release 0.70.0 - unreleased
 
+  GIRAPH-62: Provide input format for reading graphs stored as adjacency 
+  lists. (jghoman)
+
   GIRAPH-59: Missing some test if debug enabled before LOG.debug() and
   LOG.info(). (guzhiwei via aching).
 

Modified: incubator/giraph/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/pom.xml?rev=1189402&r1=1189401&r2=1189402&view=diff
==============================================================================
--- incubator/giraph/trunk/pom.xml (original)
+++ incubator/giraph/trunk/pom.xml Wed Oct 26 19:23:23 2011
@@ -484,5 +484,11 @@ under the License.
       <artifactId>json</artifactId>
       <version>20090211</version>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>1.8.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1189402&r1=1189401&r2=1189402&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java 
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Wed 
Oct 26 19:23:23 2011
@@ -41,7 +41,7 @@ public class Edge<I extends WritableComp
     private I destVertexId = null;
     /** Edge value */
     private E edgeValue = null;
-    /** Configuration - Used to instiantiate classes */
+    /** Configuration - Used to instantiate classes */
     private Configuration conf = null;
 
     /**
@@ -78,6 +78,24 @@ public class Edge<I extends WritableComp
         return edgeValue;
     }
 
+    /**
+     * Set the destination vertex index of this edge.
+     *
+     * @param destVertexId new destination vertex
+     */
+    public void setDestVertexId(I destVertexId) {
+       this.destVertexId = destVertexId;
+    }
+
+    /**
+     * Set the value for this edge.
+     *
+     * @param edgeValue new edge value
+     */
+    public void setEdgeValue(E edgeValue) {
+       this.edgeValue = edgeValue;
+    }
+
     @Override
     public String toString() {
         return "(DestVertexIndex = " + destVertexId +

Added: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1189402&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
 (added)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
 Wed Oct 26 19:23:23 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.lib;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+import java.io.IOException;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type.  For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ *   1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+abstract class AdjacencyListVertexReader<I extends WritableComparable,
+    V extends Writable, E extends Writable> extends
+    TextVertexInputFormat.TextVertexReader<I, V, E> {
+
+  public static final String LINE_TOKENIZE_VALUE = "adj.list.input.token";
+  public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+
+  private String splitValue = null;
+
+  /**
+   * Utility for doing any cleaning of each line before it is tokenized.
+   */
+  public interface LineSanitizer {
+    /**
+     * Clean string s before attempting to tokenize it.
+     */
+    public String sanitize(String s);
+  }
+
+  private LineSanitizer sanitizer = null;
+
+  AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader) 
{
+    super(lineRecordReader);
+  }
+
+  AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+      LineSanitizer sanitizer) {
+    super(lineRecordReader);
+    this.sanitizer = sanitizer;
+  }
+
+  /**
+   * Store the Id for this line in an instance of its correct type.
+   * @param s Id of vertex from line
+   * @param id Instance of Id's type, in which to store its value
+   */
+  abstract public void decodeId(String s, I id);
+
+  /**
+   * Store the value for this line in an instance of its correct type.
+   * @param s Value from line
+   * @param value Instance of value's type, in which to store its value
+   */
+  abstract public void decodeValue(String s, V value);
+
+  /**
+   * Store an edge from the line into an instance of a correctly typed Edge
+   * @param id The edge's id from the line
+   * @param value The edge's value from the line
+   * @param edge Instance of edge in which to store the id and value
+   */
+  abstract public void decodeEdge(String id, String value, Edge<I, E> edge);
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean next(MutableVertex<I, V, E, ?> iveMutableVertex)
+      throws IOException, InterruptedException {
+    if (!getRecordReader().nextKeyValue()) {
+        return false;
+    }
+
+    Configuration conf = getContext().getConfiguration();
+    String line = getRecordReader().getCurrentValue().toString();
+
+    if (sanitizer != null) {
+      line = sanitizer.sanitize(line);
+    }
+
+    if (splitValue == null) {
+      splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+    }
+
+    String [] values = line.split(splitValue);
+
+    if ((values.length < 2) || (values.length % 2 != 0)) {
+      throw new IllegalArgumentException("Line did not split correctly: " + 
line);
+    }
+
+    I vertexId = BspUtils.<I>createVertexIndex(conf);
+    decodeId(values[0], vertexId);
+    iveMutableVertex.setVertexId(vertexId);
+
+    V value = BspUtils.<V>createVertexValue(conf);
+    decodeValue(values[1], value);
+    iveMutableVertex.setVertexValue(value);
+
+    int i = 2;
+    Edge<I, E> edge = new Edge<I, E>();
+    while(i < values.length) {
+      decodeEdge(values[i], values[i + 1], edge);
+      iveMutableVertex.addEdge(edge.getDestVertexId(), edge.getEdgeValue());
+      i += 2;
+    }
+
+    return true;
+  }
+}

Added: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1189402&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
 (added)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
 Wed Oct 26 19:23:23 2011
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.lib;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * InputFormat for reading graphs stored as (ordered) adjacency lists
+ * with the vertex ids longs and the vertex values and edges doubles.
+ * For example:
+ * 22 0.1 45 0.3 99 0.44
+ * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
+ * with values of 0.3 and 0.44, respectively.
+ */
+public class LongDoubleDoubleAdjacencyListVertexInputFormat extends
+    TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>  {
+
+  static class VertexReader extends
+      AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable> {
+
+    VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+      super(lineRecordReader);
+    }
+
+    VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+                 LineSanitizer sanitizer) {
+      super(lineRecordReader, sanitizer);
+    }
+
+    @Override
+    public void decodeId(String s, LongWritable id) {
+      id.set(Long.valueOf(s));
+    }
+
+    @Override
+    public void decodeValue(String s, DoubleWritable value) {
+      value.set(Double.valueOf(s));
+    }
+
+    @Override
+    public void decodeEdge(String s1, String s2, Edge<LongWritable, 
DoubleWritable>
+        textIntWritableEdge) {
+      textIntWritableEdge.setDestVertexId(new LongWritable(Long.valueOf(s1)));
+      textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2)));
+    }
+  }
+
+  @Override
+  public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit 
split,
+      TaskAttemptContext context) throws IOException {
+    return new VertexReader(textInputFormat.createRecordReader(split, 
context));
+  }
+}

Added: 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1189402&view=auto
==============================================================================
--- 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
 (added)
+++ 
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
 Wed Oct 26 19:23:23 2011
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.lib;
+
+
+import junit.framework.TestCase;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends 
TestCase {
+
+  private RecordReader<LongWritable, Text> rr;
+  private Configuration conf;
+  private TaskAttemptContext tac;
+
+  public void setUp() throws IOException, InterruptedException {
+    rr = mock(RecordReader.class);
+    when(rr.nextKeyValue()).thenReturn(true);
+    conf = new Configuration();
+    conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, LongWritable.class, 
Writable.class);
+    conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class, 
Writable.class);
+    tac = mock(TaskAttemptContext.class);
+    when(tac.getConfiguration()).thenReturn(conf);
+  }
+
+  public void testIndexMustHaveValue() throws IOException, 
InterruptedException {
+    String input = "123";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+    vr.initialize(null, tac);
+    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
+        mutableVertex = mock(MutableVertex.class);
+
+    try {
+      vr.next(mutableVertex);
+      fail("Should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().startsWith("Line did not split correctly: 
"));
+    }
+  }
+
+  public void testEdgesMustHaveValues() throws IOException, 
InterruptedException {
+    String input = "99\t55.2\t100";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+    vr.initialize(null, tac);
+    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
+            mutableVertex = mock(MutableVertex.class);
+    try {
+      vr.next(mutableVertex);
+      fail("Should have thrown an IllegalArgumentException");
+    } catch (IllegalArgumentException iae) {
+      assertTrue(iae.getMessage().startsWith("Line did not split correctly: 
"));
+    }
+  }
+
+  public void testHappyPath() throws IOException, InterruptedException {
+    String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+    vr.initialize(null, tac);
+    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
+        mutableVertex = mock(MutableVertex.class);
+
+    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
+    verify(mutableVertex).setVertexId(new LongWritable(42));
+    verify(mutableVertex).setVertexValue(new DoubleWritable(0.1d));
+    verify(mutableVertex).addEdge(new LongWritable(99l), new 
DoubleWritable(0.2d));
+    verify(mutableVertex).addEdge(new LongWritable(2000l), new 
DoubleWritable(0.3d));
+    verify(mutableVertex).addEdge(new LongWritable(4000l), new 
DoubleWritable(0.4d));
+    verifyNoMoreInteractions(mutableVertex);
+  }
+
+  public void testDifferentSeparators() throws IOException, 
InterruptedException {
+    String input = "12345:42.42:9999999:99.9";
+
+    when(rr.getCurrentValue()).thenReturn(new Text(input));
+    conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
+    LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+        new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+    vr.initialize(null, tac);
+    MutableVertex<LongWritable, DoubleWritable, DoubleWritable, 
BooleanWritable>
+        mutableVertex = mock(MutableVertex.class);
+    assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
+    verify(mutableVertex).setVertexId(new LongWritable(12345l));
+    verify(mutableVertex).setVertexValue(new DoubleWritable(42.42d));
+    verify(mutableVertex).addEdge(new LongWritable(9999999l),
+        new DoubleWritable(99.9d));
+    verifyNoMoreInteractions(mutableVertex);
+  }
+
+}


Reply via email to