Author: jghoman
Date: Wed Oct 26 19:23:23 2011
New Revision: 1189402
URL: http://svn.apache.org/viewvc?rev=1189402&view=rev
Log:
GIRAPH-62. Provide input format for reading graphs stored as adjacency lists.
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/pom.xml
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1189402&r1=1189401&r2=1189402&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Oct 26 19:23:23 2011
@@ -2,6 +2,9 @@ Giraph Change Log
Release 0.70.0 - unreleased
+ GIRAPH-62: Provide input format for reading graphs stored as adjacency
+ lists. (jghoman)
+
GIRAPH-59: Missing some test if debug enabled before LOG.debug() and
LOG.info(). (guzhiwei via aching).
Modified: incubator/giraph/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/pom.xml?rev=1189402&r1=1189401&r2=1189402&view=diff
==============================================================================
--- incubator/giraph/trunk/pom.xml (original)
+++ incubator/giraph/trunk/pom.xml Wed Oct 26 19:23:23 2011
@@ -484,5 +484,11 @@ under the License.
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.8.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java?rev=1189402&r1=1189401&r2=1189402&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java
(original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/Edge.java Wed
Oct 26 19:23:23 2011
@@ -41,7 +41,7 @@ public class Edge<I extends WritableComp
private I destVertexId = null;
/** Edge value */
private E edgeValue = null;
- /** Configuration - Used to instiantiate classes */
+ /** Configuration - Used to instantiate classes */
private Configuration conf = null;
/**
@@ -78,6 +78,24 @@ public class Edge<I extends WritableComp
return edgeValue;
}
+ /**
+ * Set the destination vertex index of this edge.
+ *
+ * @param destVertexId new destination vertex
+ */
+ public void setDestVertexId(I destVertexId) {
+ this.destVertexId = destVertexId;
+ }
+
+ /**
+ * Set the value for this edge.
+ *
+ * @param edgeValue new edge value
+ */
+ public void setEdgeValue(E edgeValue) {
+ this.edgeValue = edgeValue;
+ }
+
@Override
public String toString() {
return "(DestVertexIndex = " + destVertexId +
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java?rev=1189402&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/AdjacencyListVertexReader.java
Wed Oct 26 19:23:23 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.lib;
+
+import org.apache.giraph.graph.BspUtils;
+import org.apache.giraph.graph.Edge;
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+import java.io.IOException;
+
+/**
+ * VertexReader that readers lines of text with vertices encoded as adjacency
+ * lists and converts each token to the correct type. For example, a graph
+ * with vertices as integers and values as doubles could be encoded as:
+ * 1 0.1 2 0.2 3 0.3
+ * to represent a vertex named 1, with 0.1 as its value and two edges, to
+ * vertices 2 and 3, with edge values of 0.2 and 0.3, respectively.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+abstract class AdjacencyListVertexReader<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends
+ TextVertexInputFormat.TextVertexReader<I, V, E> {
+
+ public static final String LINE_TOKENIZE_VALUE = "adj.list.input.token";
+ public static final String LINE_TOKENIZE_VALUE_DEFAULT = "\t";
+
+ private String splitValue = null;
+
+ /**
+ * Utility for doing any cleaning of each line before it is tokenized.
+ */
+ public interface LineSanitizer {
+ /**
+ * Clean string s before attempting to tokenize it.
+ */
+ public String sanitize(String s);
+ }
+
+ private LineSanitizer sanitizer = null;
+
+ AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader)
{
+ super(lineRecordReader);
+ }
+
+ AdjacencyListVertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+ LineSanitizer sanitizer) {
+ super(lineRecordReader);
+ this.sanitizer = sanitizer;
+ }
+
+ /**
+ * Store the Id for this line in an instance of its correct type.
+ * @param s Id of vertex from line
+ * @param id Instance of Id's type, in which to store its value
+ */
+ abstract public void decodeId(String s, I id);
+
+ /**
+ * Store the value for this line in an instance of its correct type.
+ * @param s Value from line
+ * @param value Instance of value's type, in which to store its value
+ */
+ abstract public void decodeValue(String s, V value);
+
+ /**
+ * Store an edge from the line into an instance of a correctly typed Edge
+ * @param id The edge's id from the line
+ * @param value The edge's value from the line
+ * @param edge Instance of edge in which to store the id and value
+ */
+ abstract public void decodeEdge(String id, String value, Edge<I, E> edge);
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean next(MutableVertex<I, V, E, ?> iveMutableVertex)
+ throws IOException, InterruptedException {
+ if (!getRecordReader().nextKeyValue()) {
+ return false;
+ }
+
+ Configuration conf = getContext().getConfiguration();
+ String line = getRecordReader().getCurrentValue().toString();
+
+ if (sanitizer != null) {
+ line = sanitizer.sanitize(line);
+ }
+
+ if (splitValue == null) {
+ splitValue = conf.get(LINE_TOKENIZE_VALUE, LINE_TOKENIZE_VALUE_DEFAULT);
+ }
+
+ String [] values = line.split(splitValue);
+
+ if ((values.length < 2) || (values.length % 2 != 0)) {
+ throw new IllegalArgumentException("Line did not split correctly: " +
line);
+ }
+
+ I vertexId = BspUtils.<I>createVertexIndex(conf);
+ decodeId(values[0], vertexId);
+ iveMutableVertex.setVertexId(vertexId);
+
+ V value = BspUtils.<V>createVertexValue(conf);
+ decodeValue(values[1], value);
+ iveMutableVertex.setVertexValue(value);
+
+ int i = 2;
+ Edge<I, E> edge = new Edge<I, E>();
+ while(i < values.length) {
+ decodeEdge(values[i], values[i + 1], edge);
+ iveMutableVertex.addEdge(edge.getDestVertexId(), edge.getEdgeValue());
+ i += 2;
+ }
+
+ return true;
+ }
+}
Added:
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1189402&view=auto
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
(added)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/lib/LongDoubleDoubleAdjacencyListVertexInputFormat.java
Wed Oct 26 19:23:23 2011
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.lib;
+
+import org.apache.giraph.graph.Edge;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * InputFormat for reading graphs stored as (ordered) adjacency lists
+ * with the vertex ids longs and the vertex values and edges doubles.
+ * For example:
+ * 22 0.1 45 0.3 99 0.44
+ * to repesent a vertex with id 22, value of 0.1 and edges to nodes 45 and 99,
+ * with values of 0.3 and 0.44, respectively.
+ */
+public class LongDoubleDoubleAdjacencyListVertexInputFormat extends
+ TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable> {
+
+ static class VertexReader extends
+ AdjacencyListVertexReader<LongWritable, DoubleWritable, DoubleWritable> {
+
+ VertexReader(RecordReader<LongWritable, Text> lineRecordReader) {
+ super(lineRecordReader);
+ }
+
+ VertexReader(RecordReader<LongWritable, Text> lineRecordReader,
+ LineSanitizer sanitizer) {
+ super(lineRecordReader, sanitizer);
+ }
+
+ @Override
+ public void decodeId(String s, LongWritable id) {
+ id.set(Long.valueOf(s));
+ }
+
+ @Override
+ public void decodeValue(String s, DoubleWritable value) {
+ value.set(Double.valueOf(s));
+ }
+
+ @Override
+ public void decodeEdge(String s1, String s2, Edge<LongWritable,
DoubleWritable>
+ textIntWritableEdge) {
+ textIntWritableEdge.setDestVertexId(new LongWritable(Long.valueOf(s1)));
+ textIntWritableEdge.setEdgeValue(new DoubleWritable(Double.valueOf(s2)));
+ }
+ }
+
+ @Override
+ public org.apache.giraph.graph.VertexReader createVertexReader(InputSplit
split,
+ TaskAttemptContext context) throws IOException {
+ return new VertexReader(textInputFormat.createRecordReader(split,
context));
+ }
+}
Added:
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java?rev=1189402&view=auto
==============================================================================
---
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
(added)
+++
incubator/giraph/trunk/src/test/java/org/apache/giraph/lib/TestLongDoubleDoubleAdjacencyListVertexInputFormat.java
Wed Oct 26 19:23:23 2011
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.lib;
+
+
+import junit.framework.TestCase;
+import org.apache.giraph.graph.GiraphJob;
+import org.apache.giraph.graph.MutableVertex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestLongDoubleDoubleAdjacencyListVertexInputFormat extends
TestCase {
+
+ private RecordReader<LongWritable, Text> rr;
+ private Configuration conf;
+ private TaskAttemptContext tac;
+
+ public void setUp() throws IOException, InterruptedException {
+ rr = mock(RecordReader.class);
+ when(rr.nextKeyValue()).thenReturn(true);
+ conf = new Configuration();
+ conf.setClass(GiraphJob.VERTEX_INDEX_CLASS, LongWritable.class,
Writable.class);
+ conf.setClass(GiraphJob.VERTEX_VALUE_CLASS, DoubleWritable.class,
Writable.class);
+ tac = mock(TaskAttemptContext.class);
+ when(tac.getConfiguration()).thenReturn(conf);
+ }
+
+ public void testIndexMustHaveValue() throws IOException,
InterruptedException {
+ String input = "123";
+
+ when(rr.getCurrentValue()).thenReturn(new Text(input));
+ LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+ new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+ vr.initialize(null, tac);
+ MutableVertex<LongWritable, DoubleWritable, DoubleWritable,
BooleanWritable>
+ mutableVertex = mock(MutableVertex.class);
+
+ try {
+ vr.next(mutableVertex);
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().startsWith("Line did not split correctly:
"));
+ }
+ }
+
+ public void testEdgesMustHaveValues() throws IOException,
InterruptedException {
+ String input = "99\t55.2\t100";
+
+ when(rr.getCurrentValue()).thenReturn(new Text(input));
+ LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+ new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+ vr.initialize(null, tac);
+ MutableVertex<LongWritable, DoubleWritable, DoubleWritable,
BooleanWritable>
+ mutableVertex = mock(MutableVertex.class);
+ try {
+ vr.next(mutableVertex);
+ fail("Should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException iae) {
+ assertTrue(iae.getMessage().startsWith("Line did not split correctly:
"));
+ }
+ }
+
+ public void testHappyPath() throws IOException, InterruptedException {
+ String input = "42\t0.1\t99\t0.2\t2000\t0.3\t4000\t0.4";
+
+ when(rr.getCurrentValue()).thenReturn(new Text(input));
+ LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+ new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+ vr.initialize(null, tac);
+ MutableVertex<LongWritable, DoubleWritable, DoubleWritable,
BooleanWritable>
+ mutableVertex = mock(MutableVertex.class);
+
+ assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
+ verify(mutableVertex).setVertexId(new LongWritable(42));
+ verify(mutableVertex).setVertexValue(new DoubleWritable(0.1d));
+ verify(mutableVertex).addEdge(new LongWritable(99l), new
DoubleWritable(0.2d));
+ verify(mutableVertex).addEdge(new LongWritable(2000l), new
DoubleWritable(0.3d));
+ verify(mutableVertex).addEdge(new LongWritable(4000l), new
DoubleWritable(0.4d));
+ verifyNoMoreInteractions(mutableVertex);
+ }
+
+ public void testDifferentSeparators() throws IOException,
InterruptedException {
+ String input = "12345:42.42:9999999:99.9";
+
+ when(rr.getCurrentValue()).thenReturn(new Text(input));
+ conf.set(AdjacencyListVertexReader.LINE_TOKENIZE_VALUE, ":");
+ LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader vr =
+ new LongDoubleDoubleAdjacencyListVertexInputFormat.VertexReader(rr);
+
+ vr.initialize(null, tac);
+ MutableVertex<LongWritable, DoubleWritable, DoubleWritable,
BooleanWritable>
+ mutableVertex = mock(MutableVertex.class);
+ assertTrue("Should have been able to read vertex", vr.next(mutableVertex));
+ verify(mutableVertex).setVertexId(new LongWritable(12345l));
+ verify(mutableVertex).setVertexValue(new DoubleWritable(42.42d));
+ verify(mutableVertex).addEdge(new LongWritable(9999999l),
+ new DoubleWritable(99.9d));
+ verifyNoMoreInteractions(mutableVertex);
+ }
+
+}