Author: edwardyoon
Date: Fri Jan 13 01:06:05 2012
New Revision: 1230852
URL: http://svn.apache.org/viewvc?rev=1230852&view=rev
Log:
HAMA-488 PageRank refactor and fix bugs
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Jan 13 01:06:05 2012
@@ -17,6 +17,7 @@ Release 0.4 - Unreleased
BUG FIXES
+ HAMA-488: PageRank refactor and fix bugs (tjungblut via edwardyoon)
HAMA-489: Check port availability before forking child process (edwardyoon)
HAMA-474: ClusterStatus.getTasks() always returns 0 (edwardyoon)
HAMA-472: The task should be killed if it fails to initialize (edwardyoon)
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Fri
Jan 13 01:06:05 2012
@@ -230,7 +230,7 @@ public class BSPJob extends BSPJobContex
}
public int getNumBspTask() {
- // default is 1, because with zero, we will hang in infinity
+ // default is 1, because with zero, we will hang in infinity
return conf.getInt("bsp.peers.num", 1);
}
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
Fri Jan 13 01:06:05 2012
@@ -897,7 +897,17 @@ public class BSPJobClient extends Config
throw new IOException("Expect one token as the result of "
+ Shell.USER_NAME_COMMAND + ": " + toString(result));
}
- return result[0];
+ String fixResult = fixCygwinName(result[0]);
+ return fixResult;
+ }
+
+ private static String fixCygwinName(String in) {
+ String string = in;
+ if (string.contains("\\")) {
+ // this is for cygwin systems
+ string = string.substring(string.indexOf("\\"));
+ }
+ return string;
}
static String getUnixUserGroupName(String user) throws IOException {
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Fri Jan 13 01:06:05 2012
@@ -30,6 +30,7 @@ public class ExampleDriver {
pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path");
pgd.addClass("cmb", CombineExample.class, "Combine");
pgd.addClass("bench", RandBench.class, "Random Benchmark");
+ pgd.addClass("pagerank", PageRank.class, "PageRank");
pgd.driver(args);
} catch (Throwable e) {
e.printStackTrace();
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Fri Jan 13 01:06:05 2012
@@ -28,25 +28,24 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.DoubleMessage;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.graph.VertexArrayWritable;
import org.apache.hama.graph.VertexWritable;
import org.apache.hama.util.KeyValuePair;
public class PageRank extends
- BSP<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable>
{
+ BSP<VertexWritable, VertexArrayWritable, Text, DoubleWritable> {
+
public static final Log LOG = LogFactory.getLog(PageRank.class);
private final HashMap<VertexWritable, VertexWritable[]> adjacencyList = new
HashMap<VertexWritable, VertexWritable[]>();
@@ -64,29 +63,40 @@ public class PageRank extends
@Override
public void setup(
- BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text,
DoubleWritable> peer)
+ BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
throws IOException {
- // map our stuff into ram
- KeyValuePair<VertexWritable, ShortestPathVertexArrayWritable> next = null;
+ DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
+ EPSILON = Double.parseDouble(conf.get("epsilon.error"));
+ MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
+ masterTaskName = peer.getPeerName(0);
+
+ // map our stuff into ram
+ KeyValuePair<VertexWritable, VertexArrayWritable> next = null;
while ((next = peer.readNext()) != null) {
- adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
+ adjacencyList.put(next.getKey(), (VertexWritable[]) next.getValue()
.toArray());
vertexLookupMap.put(next.getKey().getName(), next.getKey());
}
- // normally this is the global number of vertices
+ // normally this should be the global number of vertices
numOfVertices = vertexLookupMap.size();
- DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
- EPSILON = Double.parseDouble(conf.get("epsilon.error"));
- MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
- masterTaskName = peer.getPeerName(0);
+
+ // reread the input to save ram
+ peer.reopenInput();
+ VertexWritable key = new VertexWritable();
+ VertexArrayWritable value = new VertexArrayWritable();
+ while (peer.readNext(key, value)) {
+ VertexWritable vertexWritable = vertexLookupMap.get(key.getName());
+ tentativePagerank
+ .put(vertexWritable, Double.valueOf(1.0 / numOfVertices));
+ }
}
@Override
public void bsp(
- BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text,
DoubleWritable> peer)
+ BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
throws IOException, SyncException, InterruptedException {
// while the error not converges against epsilon do the pagerank stuff
@@ -142,11 +152,11 @@ public class PageRank extends
@Override
public void cleanup(
- BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text,
DoubleWritable> peer) {
+ BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
{
try {
for (Entry<VertexWritable, Double> row : tentativePagerank.entrySet()) {
- peer.write(new Text(row.getKey().getName()), new DoubleWritable(row
- .getValue()));
+ peer.write(new Text(row.getKey().getName()),
+ new DoubleWritable(row.getValue()));
}
} catch (IOException e) {
e.printStackTrace();
@@ -154,7 +164,7 @@ public class PageRank extends
}
private double broadcastError(
- BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text,
DoubleWritable> peer,
+ BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
double error) throws IOException, SyncException, InterruptedException {
peer.send(masterTaskName, new DoubleMessage("", error));
peer.sync();
@@ -194,7 +204,7 @@ public class PageRank extends
}
private void sendMessageToNeighbors(
- BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text,
DoubleWritable> peer,
+ BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
VertexWritable v) throws IOException {
VertexWritable[] outgoingEdges = adjacencyList.get(v);
for (VertexWritable adjacent : outgoingEdges) {
@@ -206,10 +216,32 @@ public class PageRank extends
}
}
+ static void printOutput(FileSystem fs, Configuration conf) throws
IOException {
+ LOG.info("-------------------- RESULTS --------------------");
+ FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+ for (FileStatus status : stati) {
+ if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+ Path path = status.getPath();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ Text key = new Text();
+ DoubleWritable value = new DoubleWritable();
+ int count = 0;
+ while (reader.next(key, value)) {
+ LOG.info(key.toString() + " | " + value.get());
+ count++;
+ if (count > 5)
+ break;
+ }
+ reader.close();
+ }
+ }
+ }
+
public static void printUsage() {
System.out.println("PageRank Example:");
System.out
- .println("<damping factor> <epsilon error> <optional: output path>
<optional: input path>");
+ .println("<input path> <output path> [damping factor] [epsilon error]
[tasks]");
+
}
public static void main(String[] args) throws IOException,
@@ -221,77 +253,34 @@ public class PageRank extends
}
HamaConfiguration conf = new HamaConfiguration(new Configuration());
- BSPJob job = new BSPJob(conf);
- job.setOutputPath(new Path("pagerank/output"));
+ BSPJob job = new BSPJob(conf, PageRank.class);
+ job.setJobName("Pagerank");
- // set the defaults
- conf.set("damping.factor", "0.85");
- conf.set("epsilon.error", "0.000001");
-
- boolean inputGiven = false;
- if (args.length < 2) {
- System.out.println("You have to provide a damping factor and an error!");
- System.out.println("Try using 0.85 0.001 as parameter!");
- System.exit(-1);
- } else {
- conf.set("damping.factor", args[0]);
- conf.set("epsilon.error", args[1]);
- LOG.info("Set damping factor to " + args[0]);
- LOG.info("Set epsilon error to " + args[1]);
- if (args.length > 2) {
- LOG.info("Set output path to " + args[2]);
- job.setOutputPath(new Path(args[2]));
- if (args.length == 4) {
- job.setInputPath(new Path(args[3]));
- LOG.info("Using custom input at " + args[3]);
- inputGiven = true;
- }
- }
+ job.setInputPath(new Path(args[0]));
+ job.setOutputPath(new Path(args[1]));
+
+ conf.set("damping.factor", (args.length > 2) ? args[2] : "0.85");
+ conf.set("epsilon.error", (args.length > 3) ? args[3] : "0.000001");
+ if (args.length == 5) {
+ job.setNumBspTask(Integer.parseInt(args[4]));
}
- BSPJobClient jobClient = new BSPJobClient(conf);
- ClusterStatus cluster = jobClient.getClusterStatus(true);
-
// leave the iterations on default
conf.set("max.iterations", "0");
- if (!inputGiven) {
- Path tmp = new Path("pagerank/input");
- FileSystem.get(conf).delete(tmp, true);
- // ShortestPathsGraphLoader.loadGraph(conf, tmp);
- job.setInputPath(tmp);
- }
-
job.setInputFormat(SequenceFileInputFormat.class);
job.setPartitioner(HashPartitioner.class);
job.setOutputFormat(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- job.setNumBspTask(cluster.getMaxTasks());
+ job.setOutputValueClass(DoubleWritable.class);
job.setBspClass(PageRank.class);
- job.setJarByClass(PageRank.class);
- job.setJobName("Pagerank");
+
+ long startTime = System.currentTimeMillis();
if (job.waitForCompletion(true)) {
printOutput(FileSystem.get(conf), conf);
+ System.out.println("Job Finished in "
+ + (double) (System.currentTimeMillis() - startTime) / 1000.0
+ + " seconds");
}
}
-
- static void printOutput(FileSystem fs, Configuration conf) throws
IOException {
- LOG.info("-------------------- RESULTS --------------------");
- FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
- for (FileStatus status : stati) {
- if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
- Path path = status.getPath();
- SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- Text key = new Text();
- DoubleWritable value = new DoubleWritable();
- while (reader.next(key, value)) {
- LOG.info(key.toString() + " | " + value.get());
- }
- reader.close();
- }
- }
- }
-
}
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Fri Jan 13 01:06:05 2012
@@ -199,7 +199,7 @@ public class ShortestPaths extends
}
public static void printUsage() {
- System.out.println("Usage: <startNode> <output path> <input path>
[numTasks]");
+ System.out.println("Usage: <startNode> <input path> <output path>
[tasks]");
}
public static void main(String[] args) throws IOException,
@@ -218,8 +218,8 @@ public class ShortestPaths extends
bsp.setJobName("Single Source Shortest Path");
conf.set(START_VERTEX, args[0]);
- bsp.setOutputPath(new Path(args[1]));
- bsp.setInputPath(new Path(args[2]));
+ bsp.setInputPath(new Path(args[1]));
+ bsp.setOutputPath(new Path(args[2]));
if(args.length == 4) {
bsp.setNumBspTask(Integer.parseInt(args[3]));
Modified:
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
(original)
+++
incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
Fri Jan 13 01:06:05 2012
@@ -48,7 +48,7 @@ public class ShortestPathsTest extends T
generateTestData();
try {
- ShortestPaths.main(new String[] { "Frankfurt", OUTPUT, INPUT });
+ ShortestPaths.main(new String[] { "Frankfurt", INPUT, OUTPUT });
verifyResult();
} finally {