Author: tjungblut
Date: Thu May 17 13:06:31 2012
New Revision: 1339586
URL: http://svn.apache.org/viewvc?rev=1339586&view=rev
Log:
[HAMA-571]: Provide graph repair function in GraphJobRunner
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu May 17 13:06:31 2012
@@ -16,7 +16,8 @@ Release 0.5 - April 10, 2012
BUG FIXES
IMPROVEMENTS
-
+
+ HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
HAMA-521: Improve message buffering to save memory (Thomas Jungblut via
edwardyoon)
HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
Thu May 17 13:06:31 2012
@@ -117,7 +117,8 @@ public interface BSPPeer<K1, V1, K2, V2,
public boolean readNext(K1 key, V1 value) throws IOException;
/**
- * Reads the next key value pair and returns it as a pair.
+ * Reads the next key value pair and returns it as a pair. It may reuse a
+ * {@link KeyValuePair} instance to save garbage collection time.
*
* @return null if there are no records left.
* @throws IOException
@@ -134,7 +135,7 @@ public interface BSPPeer<K1, V1, K2, V2,
* @return the jobs configuration
*/
public Configuration getConfiguration();
-
+
/**
* Get the {@link Counter} of the given group with the given name.
*
@@ -151,26 +152,26 @@ public interface BSPPeer<K1, V1, K2, V2,
* @return the <code>Counter</code> of the given group/name.
*/
public Counter getCounter(String group, String name);
-
+
/**
- * Increments the counter identified by the key, which can be of
- * any {@link Enum} type, by the specified amount.
+ * Increments the counter identified by the key, which can be of any
+ * {@link Enum} type, by the specified amount.
*
- * @param key key to identify the counter to be incremented. The key can be
- * be any <code>Enum</code>.
- * @param amount A non-negative amount by which the counter is to
- * be incremented.
+ * @param key key to identify the counter to be incremented. The key can be
be
+ * any <code>Enum</code>.
+ * @param amount A non-negative amount by which the counter is to be
+ * incremented.
*/
public void incrementCounter(Enum<?> key, long amount);
-
+
/**
- * Increments the counter identified by the group and counter name
- * by the specified amount.
+ * Increments the counter identified by the group and counter name by the
+ * specified amount.
*
* @param group name to identify the group of the counter to be incremented.
* @param counter name to identify the counter within the group.
- * @param amount A non-negative amount by which the counter is to
- * be incremented.
+ * @param amount A non-negative amount by which the counter is to be
+ * incremented.
*/
public void incrementCounter(String group, String counter, long amount);
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
Thu May 17 13:06:31 2012
@@ -84,6 +84,7 @@ public final class BSPPeerImpl<K1, V1, K
private OutputCollector<K2, V2> collector;
private RecordReader<K1, V1> in;
private RecordWriter<K2, V2> outWriter;
+ private final KeyValuePair<K1, V1> cachedPair = new KeyValuePair<K1, V1>();
private InetSocketAddress peerAddress;
@@ -480,7 +481,10 @@ public final class BSPPeerImpl<K1, V1, K
K1 k = in.createKey();
V1 v = in.createValue();
if (in.next(k, v)) {
- return new KeyValuePair<K1, V1>(k, v);
+ cachedPair.clear();
+ cachedPair.setKey(k);
+ cachedPair.setValue(v);
+ return cachedPair;
} else {
return null;
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
Thu May 17 13:06:31 2012
@@ -18,15 +18,16 @@
package org.apache.hama.util;
/**
- * Immutable class for key values.
- *
- * @param <K>
- * @param <V>
+ * Mutable class for key values.
*/
public class KeyValuePair<K, V> {
- private final K key;
- private final V value;
+ private K key;
+ private V value;
+
+ public KeyValuePair() {
+
+ }
public KeyValuePair(K key, V value) {
super();
@@ -42,4 +43,17 @@ public class KeyValuePair<K, V> {
return value;
}
+ public void setKey(K key) {
+ this.key = key;
+ }
+
+ public void setValue(V value) {
+ this.value = value;
+ }
+
+ public void clear() {
+ this.key = null;
+ this.value = null;
+ }
+
}
Modified:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
(original)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
Thu May 17 13:06:31 2012
@@ -34,8 +34,14 @@ import org.apache.hadoop.io.SequenceFile
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.examples.MindistSearch.MinTextCombiner;
+import org.apache.hama.examples.MindistSearch.MindistSearchVertex;
import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.GraphJobRunner;
import org.apache.hama.graph.VertexArrayWritable;
import org.apache.hama.graph.VertexWritable;
@@ -78,8 +84,8 @@ public class MindistSearchTest extends T
fs = FileSystem.get(conf);
}
- public void testPageRank() throws Exception {
- generateSeqTestData();
+ public void testMindistSearch() throws Exception {
+ generateSeqTestData(tmp);
try {
MindistSearch.main(new String[] { INPUT, OUTPUT });
@@ -112,17 +118,17 @@ public class MindistSearchTest extends T
}
}
- private void generateSeqTestData() throws IOException {
+ private void generateSeqTestData(Map<VertexWritable, VertexArrayWritable>
map)
+ throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
INPUT), VertexWritable.class, VertexArrayWritable.class);
- for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+ for (Map.Entry<VertexWritable, VertexArrayWritable> e : map.entrySet()) {
writer.append(e.getKey(), e.getValue());
}
writer.close();
}
- public void testPageRankUtil() throws IOException, InterruptedException,
- ClassNotFoundException, InstantiationException, IllegalAccessException {
+ public void testPageRankUtil() throws Exception {
generateTestTextData();
// <input path> <output path>
PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
@@ -135,6 +141,45 @@ public class MindistSearchTest extends T
}
}
+ public void testRepairFunctionality() throws Exception {
+ // make a copy to be safe with parallel test executions
+ final Map<VertexWritable, VertexArrayWritable> map = new
HashMap<VertexWritable, VertexArrayWritable>(
+ tmp);
+ // removing 7 should resulting in creating it and getting the same result
as
+ // usual
+ map.remove(new VertexWritable("7"));
+ generateSeqTestData(map);
+ try {
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
+ GraphJob connectedComponentsJob = new GraphJob(conf,
+ MindistSearchVertex.class);
+ connectedComponentsJob.setJobName("Mindist Search");
+
+ connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
+ connectedComponentsJob.setInputPath(new Path(INPUT));
+ connectedComponentsJob.setOutputPath(new Path(OUTPUT));
+ // set the min text combiner here
+ connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
+
+ // set the defaults
+ connectedComponentsJob.setMaxIteration(30);
+ connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
+ connectedComponentsJob.setPartitioner(HashPartitioner.class);
+ connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
+ connectedComponentsJob.setOutputKeyClass(Text.class);
+ connectedComponentsJob.setOutputValueClass(Text.class);
+
+ if (connectedComponentsJob.waitForCompletion(true)) {
+ verifyResult();
+ } else {
+ fail("Job not completed correctly!");
+ }
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
private void generateTestTextData() throws IOException {
BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
Modified:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Thu May 17 13:06:31 2012
@@ -32,7 +32,14 @@ import org.apache.hadoop.io.DoubleWritab
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.examples.PageRank.PageRankVertex;
import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.graph.AverageAggregator;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.GraphJobRunner;
import org.apache.hama.graph.VertexArrayWritable;
import org.apache.hama.graph.VertexWritable;
@@ -87,7 +94,7 @@ public class PageRankTest extends TestCa
}
public void testPageRank() throws Exception {
- generateSeqTestData();
+ generateSeqTestData(tmp);
try {
// Usage: <input> <output> [damping factor (default 0.85)] [Epsilon
// (convergence error, default 0.001)] [Max iterations (default 30)]
@@ -113,7 +120,8 @@ public class PageRankTest extends TestCa
assertTrue(sum > 0.99d && sum <= 1d);
}
- private void generateSeqTestData() throws IOException {
+ private void generateSeqTestData(Map<VertexWritable, VertexArrayWritable>
tmp)
+ throws IOException {
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
INPUT), VertexWritable.class, VertexArrayWritable.class);
for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
@@ -137,6 +145,47 @@ public class PageRankTest extends TestCa
}
}
+ public void testRepairFunctionality() throws Exception {
+ // make a copy to be safe with parallel test executions
+ final Map<VertexWritable, VertexArrayWritable> map = new
HashMap<VertexWritable, VertexArrayWritable>(
+ tmp);
+ // removing google should resulting in creating it and getting the same
+ // result as usual
+ map.remove(new VertexWritable("google.com"));
+ generateSeqTestData(map);
+ try {
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
+ GraphJob pageJob = new GraphJob(conf, PageRank.class);
+ pageJob.setJobName("Pagerank");
+
+ pageJob.setVertexClass(PageRankVertex.class);
+ pageJob.setInputPath(new Path(INPUT));
+ pageJob.setOutputPath(new Path(OUTPUT));
+
+ // set the defaults
+ pageJob.setMaxIteration(30);
+ pageJob.set("hama.pagerank.alpha", "0.85");
+ // we need to include a vertex in its adjacency list,
+ // otherwise the pagerank result has a constant loss
+ pageJob.set("hama.graph.self.ref", "true");
+
+ pageJob.setAggregatorClass(AverageAggregator.class);
+
+ pageJob.setInputFormat(SequenceFileInputFormat.class);
+ pageJob.setPartitioner(HashPartitioner.class);
+ pageJob.setOutputFormat(SequenceFileOutputFormat.class);
+ pageJob.setOutputKeyClass(Text.class);
+ pageJob.setOutputValueClass(DoubleWritable.class);
+
+ if (!pageJob.waitForCompletion(true)) {
+ fail("Job did not complete normally!");
+ }
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
private void generateTestTextData() throws IOException {
BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
(original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
Thu May 17 13:06:31 2012
@@ -44,6 +44,7 @@ public class Edge {
}
public String toString() {
- return this.getDestVertexID() + ":" + this.getCost();
+ return this.getName() + " -> " + this.getDestVertexID() + ":"
+ + this.getCost();
}
}
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu May 17 13:06:31 2012
@@ -19,6 +19,8 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -57,7 +59,8 @@ public class GraphJobRunner extends BSP
private static final Text FLAG_AGGREGATOR_INCREMENT = new Text(
S_FLAG_AGGREGATOR_INCREMENT);
- private static final String MESSAGE_COMBINER_CLASS =
"hama.vertex.message.combiner.class";
+ public static final String MESSAGE_COMBINER_CLASS =
"hama.vertex.message.combiner.class";
+ public static final String GRAPH_REPAIR = "hama.graph.repair";
private Configuration conf;
private Combiner<? extends Writable> combiner;
@@ -81,13 +84,14 @@ public class GraphJobRunner extends BSP
private int maxIteration = -1;
private long iteration;
- // TODO check if our graph is not broken and repair
public void setup(BSPPeer peer) throws IOException, SyncException,
InterruptedException {
this.conf = peer.getConfiguration();
// Choose one as a master to collect global updates
this.masterTask = peer.getPeerName(0);
+ boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
+
if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
Combiner.class)) {
LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
@@ -110,7 +114,7 @@ public class GraphJobRunner extends BSP
}
}
- loadVertices(peer);
+ loadVertices(peer, repairNeeded);
numberVertices = vertices.size() * peer.getNumPeers();
// TODO refactor this to a single step
for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
@@ -144,8 +148,8 @@ public class GraphJobRunner extends BSP
// Map <vertexID, messages>
final Map<String, LinkedList<Writable>> messages = parseMessages(peer);
- if (isMasterTask(peer) && peer.getSuperstepCount() > 1) {
-
+ // use iterations here, since repair can skew the number of supersteps
+ if (isMasterTask(peer) && iteration > 1) {
MapWritable updatedCnt = new MapWritable();
// exit if there's no update made
if (globalUpdateCounts == 0) {
@@ -173,7 +177,7 @@ public class GraphJobRunner extends BSP
}
// if we have an aggregator defined, we must make an additional sync
// to have the updated values available on all our peers.
- if (aggregator != null && peer.getSuperstepCount() > 1) {
+ if (aggregator != null && iteration > 1) {
peer.sync();
MapWritable updatedValues = (MapWritable) peer.getCurrentMessage();
@@ -273,7 +277,8 @@ public class GraphJobRunner extends BSP
return msgMap;
}
- private void loadVertices(BSPPeer peer) throws IOException {
+ private void loadVertices(BSPPeer peer, boolean repairNeeded)
+ throws IOException {
LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next
= null;
@@ -304,6 +309,53 @@ public class GraphJobRunner extends BSP
vertex.setup(conf);
vertices.put(next.getKey().getName(), vertex);
}
+
+ /*
+ * If the user want to repair the graph, it should traverse through that
+ * local chunk of adjancency list and message the corresponding peer to
+ * check whether that vertex exists. In real-life this may be dead-ending
+ * vertices, since we have no information about outgoing edges. Mainly this
+ * procedure is to prevent NullPointerExceptions from happening.
+ */
+ if (repairNeeded) {
+ LOG.debug("Starting repair of this graph!");
+ final Collection<Vertex> entries = vertices.values();
+ for (Vertex entry : entries) {
+ List<Edge> outEdges = entry.getOutEdges();
+ for (Edge e : outEdges) {
+ peer.send(e.getDestVertexID(), new Text(e.getName()));
+ }
+ }
+ try {
+ peer.sync();
+ } catch (Exception e) {
+ // we can't really recover from that, so fail this task
+ throw new RuntimeException(e);
+ }
+ Text vertexName = null;
+ while ((vertexName = (Text) peer.getCurrentMessage()) != null) {
+ String vName = vertexName.toString();
+ if (!vertices.containsKey(vName)) {
+ Vertex<? extends Writable> vertex = (Vertex<? extends Writable>)
ReflectionUtils
+ .newInstance(
+ conf.getClass("hama.graph.vertex.class", Vertex.class),
conf);
+ vertex.peer = peer;
+ vertex.setVertexID(vName);
+ vertex.runner = this;
+ if (selfReference) {
+ String target = peer.getPeerName(Math.abs((vertex.hashCode() % peer
+ .getAllPeerNames().length)));
+ vertex.edges = Collections
+ .singletonList(new Edge(vertex.getVertexID(), target, 0));
+ } else {
+ vertex.edges = Collections.emptyList();
+ }
+ vertex.setup(conf);
+ vertices.put(vName, vertex);
+ }
+ }
+ }
+
}
/**
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
(original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Thu May 17 13:06:31 2012
@@ -119,7 +119,8 @@ public abstract class Vertex<M extends W
@Override
public String toString() {
- return getVertexID() + "=" + getValue();
+ return getVertexID() + (getValue() != null ? " = " + getValue() : "")
+ + " // " + edges;
}
}