Author: tjungblut
Date: Tue Jun 19 13:08:09 2012
New Revision: 1351702
URL: http://svn.apache.org/viewvc?rev=1351702&view=rev
Log:
[HAMA-591]: Improve Pagerank
Modified:
hama/trunk/CHANGES.txt
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Jun 19 13:08:09 2012
@@ -3,7 +3,8 @@ Hama Change Log
Release 0.5 - April 10, 2012
NEW FEATURES
-
+
+ HAMA-591: Improve Pagerank (tjungblut)
HAMA-550: Implementation of Bipartite Matching (Apurv Verma via tjungblut)
HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon)
HAMA-566: Add disk-based queue (tjungblut)
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Tue Jun 19 13:08:09 2012
@@ -30,12 +30,16 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.graph.AbstractAggregator;
import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
+/**
+ * Real pagerank with dangling node contribution.
+ */
public class PageRank {
public static class PageRankVertex extends
@@ -64,17 +68,22 @@ public class PageRank {
// initialize this vertex to 1 / count of global vertices in this graph
if (this.getSuperstepCount() == 0) {
this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
- }
-
- // in the first superstep, there are no messages to check
- if (this.getSuperstepCount() >= 1) {
+ } else if (this.getSuperstepCount() >= 1) {
+ DoubleWritable danglingNodeContribution = getLastAggregatedValue(1);
double sum = 0;
while (messages.hasNext()) {
DoubleWritable msg = messages.next();
sum += msg.get();
}
- double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
- this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+ if (danglingNodeContribution == null) {
+ double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+ this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+ } else {
+ double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+ this.setValue(new DoubleWritable(alpha
+ + (DAMPING_FACTOR * (sum + danglingNodeContribution.get()
+ / this.getNumVertices()))));
+ }
}
// if we have not reached our global error yet, then proceed.
@@ -130,6 +139,18 @@ public class PageRank {
printUsage();
HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ GraphJob pageJob = createJob(args, conf);
+
+ long startTime = System.currentTimeMillis();
+ if (pageJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static GraphJob createJob(String[] args, HamaConfiguration conf)
+ throws IOException {
GraphJob pageJob = new GraphJob(conf, PageRank.class);
pageJob.setJobName("Pagerank");
@@ -140,9 +161,6 @@ public class PageRank {
// set the defaults
pageJob.setMaxIteration(30);
pageJob.set("hama.pagerank.alpha", "0.85");
- // we need to include a vertex in its adjacency list,
- // otherwise the pagerank result has a constant loss
- pageJob.set("hama.graph.self.ref", "true");
if (args.length == 6)
pageJob.setNumBspTask(Integer.parseInt(args[5]));
@@ -153,7 +171,9 @@ public class PageRank {
if (args.length >= 3)
pageJob.set("hama.pagerank.alpha", args[2]);
- pageJob.setAggregatorClass(AverageAggregator.class);
+ // error, dangling node probability sum
+ pageJob.setAggregatorClass(AverageAggregator.class,
+ DanglingNodeAggregator.class);
pageJob.setVertexIDClass(Text.class);
pageJob.setVertexValueClass(DoubleWritable.class);
@@ -167,11 +187,31 @@ public class PageRank {
pageJob.setOutputFormat(SequenceFileOutputFormat.class);
pageJob.setOutputKeyClass(Text.class);
pageJob.setOutputValueClass(DoubleWritable.class);
+ return pageJob;
+ }
- long startTime = System.currentTimeMillis();
- if (pageJob.waitForCompletion(true)) {
- System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ public static class DanglingNodeAggregator
+ extends
+ AbstractAggregator<DoubleWritable, Vertex<Text, NullWritable,
DoubleWritable>> {
+
+ double danglingNodeSum;
+
+ @Override
+ public void aggregate(Vertex<Text, NullWritable, DoubleWritable> vertex,
+ DoubleWritable value) {
+ if (vertex != null) {
+ if (vertex.getEdges().size() == 0) {
+ danglingNodeSum += value.get();
+ }
+ } else {
+ danglingNodeSum += value.get();
+ }
+ }
+
+ @Override
+ public DoubleWritable getValue() {
+ return new DoubleWritable(danglingNodeSum);
}
+
}
}
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Tue Jun 19 13:08:09 2012
@@ -28,16 +28,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileOutputFormat;
-import org.apache.hama.bsp.TextInputFormat;
-import org.apache.hama.examples.PageRank.PageRankVertex;
-import org.apache.hama.graph.AverageAggregator;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner;
@@ -55,11 +48,11 @@ public class PageRankTest extends TestCa
* functionality.
*/
String[] input = new String[] { "stackoverflow.com\tyahoo.com",
- "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
- "yahoo.com\tnasa.gov\tstackoverflow.com]",
- "twitter.com\tgoogle.com\tfacebook.com]",
- "nasa.gov\tyahoo.com\tstackoverflow.com]",
- "youtube.com\tgoogle.com\tyahoo.com]" };
+ "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
+ "yahoo.com\tnasa.gov\tstackoverflow.com",
+ "twitter.com\tgoogle.com\tfacebook.com",
+ "nasa.gov\tyahoo.com\tstackoverflow.com",
+ "youtube.com\tgoogle.com\tyahoo.com" };
private static String INPUT = "/tmp/pagerank-tmp.seq";
private static String TEXT_INPUT = "/tmp/pagerank.txt";
@@ -84,6 +77,7 @@ public class PageRankTest extends TestCa
DoubleWritable value = new DoubleWritable();
while (reader.next(key, value)) {
+ System.out.println(key + " / " + value);
sum += value.get();
}
}
@@ -95,33 +89,10 @@ public class PageRankTest extends TestCa
generateTestData();
try {
HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ conf.set("bsp.local.tasks.maximum", "1");
conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
- GraphJob pageJob = new GraphJob(conf, PageRank.class);
- pageJob.setJobName("Pagerank");
-
- pageJob.setVertexClass(PageRankVertex.class);
- pageJob.setInputPath(new Path(INPUT));
- pageJob.setOutputPath(new Path(OUTPUT));
- pageJob.setNumBspTask(2);
- // set the defaults
- pageJob.setMaxIteration(30);
- pageJob.set("hama.pagerank.alpha", "0.85");
- // we need to include a vertex in its adjacency list,
- // otherwise the pagerank result has a constant loss
- pageJob.set("hama.graph.self.ref", "true");
-
- pageJob.setAggregatorClass(AverageAggregator.class);
- pageJob.setInputKeyClass(LongWritable.class);
- pageJob.setInputValueClass(Text.class);
- pageJob.setInputFormat(TextInputFormat.class);
- pageJob.setPartitioner(HashPartitioner.class);
- pageJob.setOutputFormat(SequenceFileOutputFormat.class);
- pageJob.setOutputKeyClass(Text.class);
- pageJob.setOutputValueClass(DoubleWritable.class);
- pageJob.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
- pageJob.setVertexIDClass(Text.class);
- pageJob.setVertexValueClass(DoubleWritable.class);
- pageJob.setEdgeValueClass(NullWritable.class);
+ GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT },
+ conf);
if (!pageJob.waitForCompletion(true)) {
fail("Job did not complete normally!");
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
Tue Jun 19 13:08:09 2012
@@ -38,7 +38,7 @@ public class AbsDiffAggregator extends
}
}
- // we a master aggregates he aggregated values, he calls this, so let's just
+ // when a master aggregates he aggregated values, he calls this, so let's
just
// sum up here.
@Override
public void aggregate(Vertex<?, ?, DoubleWritable> vertex,
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
(original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Tue Jun 19 13:08:09 2012
@@ -30,6 +30,7 @@ import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
public class PageRank {
+
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
@@ -56,24 +57,21 @@ public class PageRank {
// initialize this vertex to 1 / count of global vertices in this graph
if (this.getSuperstepCount() == 0) {
this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
- }
-
- // in the first superstep, there are no messages to check
- if (this.getSuperstepCount() >= 1) {
+ } else if (this.getSuperstepCount() >= 1) {
+ DoubleWritable danglingNodeContribution = getLastAggregatedValue(1);
double sum = 0;
while (messages.hasNext()) {
DoubleWritable msg = messages.next();
sum += msg.get();
}
- double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
- this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
- if (this.getSuperstepCount() > 1) {
- if (this.getLastAggregatedValue(1).get() < 0.99
- || this.getLastAggregatedValue(1).get() > 1.1) {
- throw new RuntimeException(
- "Sum aggregator hasn't summed correctly! "
- + this.getLastAggregatedValue(1).get());
- }
+ if (danglingNodeContribution == null) {
+ double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+ this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+ } else {
+ double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+ this.setValue(new DoubleWritable(alpha
+ + (DAMPING_FACTOR * (sum + danglingNodeContribution.get()
+ / this.getNumVertices()))));
}
}