Author: edwardyoon
Date: Wed Nov 16 02:01:55 2011
New Revision: 1202506
URL: http://svn.apache.org/viewvc?rev=1202506&view=rev
Log:
Fix sssp example
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
Removed:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/partitioning/
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Wed Nov 16 02:01:55 2011
@@ -20,6 +20,7 @@
package org.apache.hama.examples;
import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hama.examples.graph.ShortestPaths;
public class ExampleDriver {
@@ -27,6 +28,7 @@ public class ExampleDriver {
ProgramDriver pgd = new ProgramDriver();
try {
pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+ pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path");
pgd.driver(args);
} catch (Throwable e) {
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
Wed Nov 16 02:01:55 2011
@@ -18,54 +18,75 @@
package org.apache.hama.examples.graph;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.DoubleMessage;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.util.KeyValuePair;
import org.apache.zookeeper.KeeperException;
-public class PageRank extends PageRankBase {
+public class PageRank extends
+ BSP<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> {
public static final Log LOG = LogFactory.getLog(PageRank.class);
- private final HashMap<Vertex, List<Vertex>> adjacencyList = new
HashMap<Vertex, List<Vertex>>();
- private final HashMap<String, Vertex> lookupMap = new HashMap<String,
Vertex>();
+ private final HashMap<Vertex, Vertex[]> adjacencyList = new HashMap<Vertex,
Vertex[]>();
+ private final HashMap<String, Vertex> vertexLookupMap = new HashMap<String,
Vertex>();
private final HashMap<Vertex, Double> tentativePagerank = new
HashMap<Vertex, Double>();
// backup of the last pagerank to determine the error
private final HashMap<Vertex, Double> lastTentativePagerank = new
HashMap<Vertex, Double>();
- private String[] peerNames;
+
+ protected static int MAX_ITERATIONS = 30;
+ protected static String masterTaskName;
+ protected static double ALPHA;
+ protected static int numOfVertices;
+ protected static double DAMPING_FACTOR = 0.85;
+ protected static double EPSILON = 0.001;
@Override
- public void setup(BSPPeer peer) {
- Configuration conf = peer.getConfiguration();
- numOfVertices = Integer.parseInt(conf.get("num.vertices"));
+ public void setup(
+ BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer)
+ throws IOException {
+ // map our stuff into ram
+
+ KeyValuePair<Vertex, ShortestPathVertexArrayWritable> next = null;
+ while ((next = peer.readNext()) != null) {
+ adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
+ .toArray());
+ vertexLookupMap.put(next.getKey().getName(), next.getKey());
+ }
+
+ // normally this is the global number of vertices
+ numOfVertices = vertexLookupMap.size();
DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
EPSILON = Double.parseDouble(conf.get("epsilon.error"));
MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
- peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+ masterTaskName = peer.getPeerName(0);
}
@Override
- public void bsp(BSPPeer peer) throws IOException, KeeperException,
- InterruptedException {
- String master = peer.getConfiguration().get(MASTER_TASK);
- // setup the datasets
- PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
- tentativePagerank, lookupMap);
+ public void bsp(
+ BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer)
+ throws IOException, KeeperException, InterruptedException {
// while the error not converges against epsilon do the pagerank stuff
double error = 1.0;
@@ -83,7 +104,10 @@ public class PageRank extends PageRankBa
HashMap<Vertex, Double> sumMap = new HashMap<Vertex, Double>();
DoubleMessage msg = null;
while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) {
- Vertex k = lookupMap.get(msg.getTag());
+ Vertex k = vertexLookupMap.get(msg.getTag());
+ if (k == null) {
+ LOG.fatal("If you see this, partitioning has totally failed.");
+ }
if (!sumMap.containsKey(k)) {
sumMap.put(k, msg.getData());
} else {
@@ -100,7 +124,7 @@ public class PageRank extends PageRankBa
// determine the error and send this to the master
double err = determineError();
- error = broadcastError(peer, master, err);
+ error = broadcastError(peer, err);
}
// in every step send the tentative pagerank of a vertex to its
// adjacent vertices
@@ -111,18 +135,29 @@ public class PageRank extends PageRankBa
iteration++;
}
- // Clears all queues entries.
+ // Clears all queues entries after we finished.
peer.clear();
- // finally save the chunk of pageranks
- PageRankBase.savePageRankMap(peer, peer.getConfiguration(),
- lastTentativePagerank);
}
- private double broadcastError(BSPPeer peer, String master, double error)
- throws IOException, KeeperException, InterruptedException {
- peer.send(master, new DoubleMessage("", error));
+ @Override
+ public void cleanup(
+ BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer) {
+ try {
+ for (Entry<Vertex, Double> row : tentativePagerank.entrySet()) {
+ peer.write(new Text(row.getKey().getName()),
+ new DoubleWritable(row.getValue()));
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private double broadcastError(
+ BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer,
+ double error) throws IOException, KeeperException, InterruptedException {
+ peer.send(masterTaskName, new DoubleMessage("", error));
peer.sync();
- if (peer.getPeerName().equals(master)) {
+ if (peer.getPeerName().equals(masterTaskName)) {
double errorSum = 0.0;
int count = 0;
DoubleMessage message;
@@ -157,15 +192,16 @@ public class PageRank extends PageRankBa
}
}
- private void sendMessageToNeighbors(BSPPeer peer, Vertex v)
- throws IOException {
- List<Vertex> outgoingEdges = adjacencyList.get(v);
+ private void sendMessageToNeighbors(
+ BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer,
+ Vertex v) throws IOException {
+ Vertex[] outgoingEdges = adjacencyList.get(v);
for (Vertex adjacent : outgoingEdges) {
- int mod = Math.abs(adjacent.getId() % peerNames.length);
+ int mod = Math.abs(adjacent.hashCode() % peer.getNumPeers());
// send a message of the tentative pagerank divided by the size of
// the outgoing edges to all adjacents
- peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(),
- tentativePagerank.get(v) / outgoingEdges.size()));
+ peer.send(peer.getPeerName(mod), new DoubleMessage(adjacent.getName(),
+ tentativePagerank.get(v) / outgoingEdges.length));
}
}
@@ -184,10 +220,14 @@ public class PageRank extends PageRankBa
}
HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ BSPJob job = new BSPJob(conf);
+ job.setOutputPath(new Path("pagerank/output"));
+
// set the defaults
conf.set("damping.factor", "0.85");
conf.set("epsilon.error", "0.000001");
+ boolean inputGiven = false;
if (args.length < 2) {
System.out.println("You have to provide a damping factor and an error!");
System.out.println("Try using 0.85 0.001 as parameter!");
@@ -198,17 +238,13 @@ public class PageRank extends PageRankBa
LOG.info("Set damping factor to " + args[0]);
LOG.info("Set epsilon error to " + args[1]);
if (args.length > 2) {
- conf.set("out.path", args[2]);
LOG.info("Set output path to " + args[2]);
+ job.setOutputPath(new Path(args[2]));
if (args.length == 4) {
- conf.set("in.path", args[3]);
+ job.setInputPath(new Path(args[3]));
LOG.info("Using custom input at " + args[3]);
- } else {
- LOG.info("Running default example graph!");
+ inputGiven = true;
}
- } else {
- conf.set("out.path", "pagerank/output");
- LOG.info("Set output path to default of pagerank/output!");
}
}
@@ -218,30 +254,43 @@ public class PageRank extends PageRankBa
// leave the iterations on default
conf.set("max.iterations", "0");
- Collection<String> activeGrooms = cluster.getActiveGroomNames().keySet();
- String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
-
- if (conf.get("in.path") == null) {
- conf = PageRankBase.partitionExample(new Path(conf.get("out.path")),
- conf, grooms);
- } else {
- conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")),
- conf, grooms);
+ if (!inputGiven) {
+ Path tmp = new Path("pagerank/input");
+ FileSystem.get(conf).delete(tmp, true);
+ //ShortestPathsGraphLoader.loadGraph(conf, tmp);
+ job.setInputPath(tmp);
}
- BSPJob job = new BSPJob(conf);
- job.setNumBspTask(cluster.getGroomServers());
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setPartitioner(HashPartitioner.class);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(IntWritable.class);
+
+ job.setNumBspTask(cluster.getMaxTasks());
job.setBspClass(PageRank.class);
job.setJarByClass(PageRank.class);
job.setJobName("Pagerank");
if (job.waitForCompletion(true)) {
- PageRankBase.printOutput(FileSystem.get(conf), conf);
+ printOutput(FileSystem.get(conf), conf);
}
}
- @Override
- public void cleanup(BSPPeer peer) {
- // TODO Auto-generated method stub
-
+ static void printOutput(FileSystem fs, Configuration conf) throws
IOException {
+ LOG.info("-------------------- RESULTS --------------------");
+ FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+ for (FileStatus status : stati) {
+ if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+ Path path = status.getPath();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ Text key = new Text();
+ DoubleWritable value = new DoubleWritable();
+ while (reader.next(key, value)) {
+ LOG.info(key.toString() + " | " + value.get());
+ }
+ reader.close();
+ }
+ }
}
+
}
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
Wed Nov 16 02:01:55 2011
@@ -24,7 +24,7 @@ import java.io.IOException;
public final class ShortestPathVertex extends Vertex {
private int weight;
- private Integer cost;
+ private int cost = Integer.MAX_VALUE;
public ShortestPathVertex() {
}
@@ -34,7 +34,7 @@ public final class ShortestPathVertex ex
this.weight = weight;
}
- public ShortestPathVertex(int weight, String name, Integer cost) {
+ public ShortestPathVertex(int weight, String name, int cost) {
super(name);
this.weight = weight;
this.cost = cost;
@@ -44,7 +44,7 @@ public final class ShortestPathVertex ex
return name;
}
- public Integer getCost() {
+ public int getCost() {
return cost;
}
@@ -52,10 +52,6 @@ public final class ShortestPathVertex ex
this.cost = cost;
}
- public int getId() {
- return id;
- }
-
public int getWeight() {
return weight;
}
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java?rev=1202506&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
(added)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
Wed Nov 16 02:01:55 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+public class ShortestPathVertexArrayWritable extends ArrayWritable {
+
+ public ShortestPathVertexArrayWritable() {
+ super(ShortestPathVertex.class);
+ }
+
+}
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
Wed Nov 16 02:01:55 2011
@@ -18,56 +18,47 @@
package org.apache.hama.examples.graph;
import java.io.IOException;
-import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.fs.FileStatus;
+import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.BSPJobClient;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BooleanMessage;
import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.IntegerMessage;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.util.KeyValuePair;
import org.apache.zookeeper.KeeperException;
-public class ShortestPaths extends ShortestPathsBase {
-
+public class ShortestPaths extends
+ BSP<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> {
public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
- private final HashMap<ShortestPathVertex, List<ShortestPathVertex>>
adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
+ public static final String START_VERTEX = "shortest.paths.start.vertex.name";
private final HashMap<String, ShortestPathVertex> vertexLookupMap = new
HashMap<String, ShortestPathVertex>();
- private String[] peerNames;
+ private final HashMap<ShortestPathVertex, ShortestPathVertex[]>
adjacencyList = new HashMap<ShortestPathVertex, ShortestPathVertex[]>();
+ private String masterTask;
@Override
- public void bsp(BSPPeer peer)
+ public void bsp(
+ BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer)
throws IOException, KeeperException, InterruptedException {
- // map our input into ram
- mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
- vertexLookupMap);
- // parse the configuration to get the peerNames
- parsePeerNames(peer.getConfiguration());
- // get our master groom
- String master = peer.getConfiguration().get(MASTER_TASK);
-
- // initial message bypass
- ShortestPathVertex v = vertexLookupMap.get(peer.getConfiguration().get(
- SHORTEST_PATHS_START_VERTEX_ID));
- if (v != null) {
- v.setCost(0);
- sendMessageToNeighbors(peer, v);
- }
-
boolean updated = true;
while (updated) {
int updatesMade = 0;
@@ -85,23 +76,51 @@ public class ShortestPaths extends Short
}
}
// synchonize with all grooms if there were updates
- updated = broadcastUpdatesMade(peer, master, updatesMade);
+ updated = broadcastUpdatesMade(peer, updatesMade);
// send updates to the adjacents of the updated vertices
for (ShortestPathVertex vertex : updatedQueue) {
sendMessageToNeighbors(peer, vertex);
}
}
- // finished, finally save our map to DFS.
- saveVertexMap(peer.getConfiguration(), peer, adjacencyList);
}
- /**
- * Parses the peer names to fix inconsistency in bsp peer names from context.
- *
- * @param conf
- */
- private void parsePeerNames(Configuration conf) {
- peerNames = conf.get(BSP_PEERS).split(";");
+ @Override
+ public void setup(
+ BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer)
+ throws IOException, KeeperException, InterruptedException {
+
+ KeyValuePair<ShortestPathVertex, ShortestPathVertexArrayWritable> next =
null;
+ while ((next = peer.readNext()) != null) {
+ adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
+ .toArray());
+ vertexLookupMap.put(next.getKey().getName(), next.getKey());
+ }
+
+ masterTask = peer.getPeerName(0);
+
+ // initial message bypass
+ ShortestPathVertex startVertex = vertexLookupMap.get(peer
+ .getConfiguration().get(START_VERTEX));
+
+ if (startVertex != null) {
+ startVertex.setCost(0);
+ sendMessageToNeighbors(peer, startVertex);
+ }
+ }
+
+ @Override
+ public void cleanup(
+ BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer) {
+ // write our map into hdfs
+ for (Entry<ShortestPathVertex, ShortestPathVertex[]> entry : adjacencyList
+ .entrySet()) {
+ try {
+ peer.write(new Text(entry.getKey().getName()), new IntWritable(entry
+ .getKey().getCost()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
}
/**
@@ -118,11 +137,12 @@ public class ShortestPaths extends Short
* @throws KeeperException
* @throws InterruptedException
*/
- private boolean broadcastUpdatesMade(BSPPeer peer, String master, int
updates)
- throws IOException, KeeperException, InterruptedException {
- peer.send(master, new IntegerMessage(peer.getPeerName(), updates));
+ private boolean broadcastUpdatesMade(
+ BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer,
+ int updates) throws IOException, KeeperException, InterruptedException {
+ peer.send(masterTask, new IntegerMessage(peer.getPeerName(), updates));
peer.sync();
- if (peer.getPeerName().equals(master)) {
+ if (peer.getPeerName().equals(masterTask)) {
int count = 0;
IntegerMessage message;
while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
@@ -145,103 +165,88 @@ public class ShortestPaths extends Short
* to. <br/>
* It sends the current cost to the adjacent vertex + the edge weight. If
cost
* will be infinity we just going to send infinity, because summing the
weight
- * will cause an integer overflow resulting in negative weights.
+ * will cause an integer overflow resulting in negative cost.
*
* @param peer The peer we got through the BSP method.
* @param id The vertex to all adjacent vertices the new cost has to be send.
* @throws IOException
*/
- private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id)
- throws IOException {
- List<ShortestPathVertex> outgoingEdges = adjacencyList.get(id);
+ private void sendMessageToNeighbors(
+ BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer,
+ ShortestPathVertex id) throws IOException {
+ ShortestPathVertex[] outgoingEdges = adjacencyList.get(id);
for (ShortestPathVertex adjacent : outgoingEdges) {
- int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
- peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id
- .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
- + adjacent.getWeight()));
+ int mod = Math.abs((adjacent.hashCode() %
peer.getAllPeerNames().length));
+ peer.send(peer.getPeerName(mod), new IntegerMessage(adjacent.getName(),
+ id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
+ + adjacent.getWeight()));
}
}
public static void printUsage() {
- System.out.println("Single Source Shortest Path Example:");
- System.out
- .println("<Startvertex name> <optional: output path> <optional: path
to own adjacency list textfile!>");
+ System.out.println("Usage: <startNode> <output path> <input path>");
+ }
+
+ public static void printOutput(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("-------------------- RESULTS --------------------");
+ FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+ for (FileStatus status : stati) {
+ if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+ Path path = status.getPath();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ int x = 0;
+ while (reader.next(key, value)) {
+ LOG.info(key.toString() + " | " + value.get());
+ x++;
+ if(x > 3)
+ break;
+ }
+ reader.close();
+ }
+ }
}
public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
- printUsage();
+ if (args.length < 3) {
+ printUsage();
+ System.exit(-1);
+ }
// BSP job configuration
HamaConfiguration conf = new HamaConfiguration();
- conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt");
- System.out.println("Setting default start vertex to \"Frankfurt\"!");
- conf.set(OUT_PATH, "sssp/output");
- Path adjacencyListPath = null;
-
- if (args.length > 0) {
- conf.set(SHORTEST_PATHS_START_VERTEX_ID, args[0]);
- System.out.println("Setting start vertex to " + args[0] + "!");
-
- if (args.length > 1) {
- conf.set(OUT_PATH, args[1]);
- System.out.println("Using new output folder: " + args[1]);
- }
-
- if (args.length > 2) {
- adjacencyListPath = new Path(args[2]);
- }
-
- }
-
- Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = null;
- if (adjacencyListPath == null)
- adjacencyList = ShortestPathsGraphLoader.loadGraph();
-
BSPJob bsp = new BSPJob(conf, ShortestPaths.class);
// Set the job name
bsp.setJobName("Single Source Shortest Path");
- bsp.setBspClass(ShortestPaths.class);
-
- // Set the task size as a number of GroomServer
- BSPJobClient jobClient = new BSPJobClient(conf);
- ClusterStatus cluster = jobClient.getClusterStatus(true);
- Collection<String> activeGrooms = cluster.getActiveGroomNames().keySet();
- String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
+ conf.set(START_VERTEX, args[0]);
+ bsp.setOutputPath(new Path(args[1]));
+ bsp.setInputPath(new Path(args[2]));
- LOG.info("Starting data partitioning...");
- if (adjacencyList == null) {
- conf = (HamaConfiguration) partition(conf, adjacencyListPath, grooms);
- } else {
- conf = (HamaConfiguration) partitionExample(conf, adjacencyList, grooms);
- }
- LOG.info("Finished!");
+ bsp.setBspClass(ShortestPaths.class);
+ bsp.setInputFormat(SequenceFileInputFormat.class);
+ bsp.setPartitioner(HashPartitioner.class);
+ bsp.setOutputFormat(SequenceFileOutputFormat.class);
+ bsp.setOutputKeyClass(Text.class);
+ bsp.setOutputValueClass(IntWritable.class);
- bsp.setNumBspTask(cluster.getGroomServers());
+ BSPJobClient jobClient = new BSPJobClient(conf);
+ ClusterStatus cluster = jobClient.getClusterStatus(false);
+ // Use max tasks
+ bsp.setNumBspTask(6);
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true)) {
+ printOutput(conf);
System.out.println("Job Finished in "
+ (double) (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
- printOutput(FileSystem.get(conf), conf);
}
}
- @Override
- public void cleanup(BSPPeer peer) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void setup(BSPPeer peer) throws IOException, KeeperException,
- InterruptedException {
- // TODO Auto-generated method stub
-
- }
-
}
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
Wed Nov 16 02:01:55 2011
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.examples.graph;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class ShortestPathsGraphLoader {
-
- static Map<ShortestPathVertex, List<ShortestPathVertex>> loadGraph() {
-
- Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new
HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
- String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
- "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
- "Muenchen" };
-
- for (String city : cities) {
- if (city.equals("Frankfurt")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(85, "Mannheim"));
- list.add(new ShortestPathVertex(173, "Kassel"));
- list.add(new ShortestPathVertex(217, "Wuerzburg"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Stuttgart")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(183, "Nuernberg"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Kassel")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(502, "Muenchen"));
- list.add(new ShortestPathVertex(173, "Frankfurt"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Erfurt")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(186, "Wuerzburg"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Wuerzburg")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(217, "Frankfurt"));
- list.add(new ShortestPathVertex(168, "Erfurt"));
- list.add(new ShortestPathVertex(103, "Nuernberg"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Mannheim")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(80, "Karlsruhe"));
- list.add(new ShortestPathVertex(85, "Frankfurt"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Karlsruhe")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(250, "Augsburg"));
- list.add(new ShortestPathVertex(80, "Mannheim"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Augsburg")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(250, "Karlsruhe"));
- list.add(new ShortestPathVertex(84, "Muenchen"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Nuernberg")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(183, "Stuttgart"));
- list.add(new ShortestPathVertex(167, "Muenchen"));
- list.add(new ShortestPathVertex(103, "Wuerzburg"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- } else if (city.equals("Muenchen")) {
- List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
- list.add(new ShortestPathVertex(167, "Nuernberg"));
- list.add(new ShortestPathVertex(173, "Kassel"));
- list.add(new ShortestPathVertex(84, "Augsburg"));
- adjacencyList.put(new ShortestPathVertex(0, city), list);
- }
- }
- return adjacencyList;
- }
-
-}
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
Wed Nov 16 02:01:55 2011
@@ -21,11 +21,10 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hama.examples.graph.partitioning.PartitionableWritable;
+import org.apache.hadoop.io.Writable;
-public class Vertex implements PartitionableWritable {
+public class Vertex implements Writable {
- protected int id;
protected String name;
public Vertex() {
@@ -35,24 +34,21 @@ public class Vertex implements Partition
public Vertex(String name) {
super();
this.name = name;
- this.id = name.hashCode();
}
@Override
public void readFields(DataInput in) throws IOException {
- this.id = in.readInt();
this.name = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(id);
out.writeUTF(name);
}
@Override
public int hashCode() {
- return id;
+ return name.hashCode();
}
@Override
@@ -69,11 +65,6 @@ public class Vertex implements Partition
return true;
}
- @Override
- public int getId() {
- return id;
- }
-
public String getName() {
return name;
}
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java?rev=1202506&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
(added)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
Wed Nov 16 02:01:55 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+public class VertexArrayWritable extends ArrayWritable {
+
+ public VertexArrayWritable() {
+ super(Vertex.class);
+ }
+
+}