Author: edwardyoon
Date: Thu Nov 24 13:20:15 2011
New Revision: 1205819
URL: http://svn.apache.org/viewvc?rev=1205819&view=rev
Log:
To reduce memory usage, vertexLookup structure replaced from Map to list.
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Added:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java?rev=1205819&view=auto
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
(added)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPathVertexMessage.java
Thu Nov 24 13:20:15 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hama.bsp.BSPMessage;
+
+public class ShortestPathVertexMessage extends BSPMessage {
+
+ ShortestPathVertex tag;
+ int data;
+
+ public ShortestPathVertexMessage() {
+ super();
+ }
+
+ public ShortestPathVertexMessage(ShortestPathVertex tag, int data) {
+ super();
+ this.tag = tag;
+ this.data = data;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(tag.getName());
+ out.writeInt(tag.getWeight());
+ out.writeInt(data);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ String name = in.readUTF();
+ int weight = in.readInt();
+ tag = new ShortestPathVertex(weight, name);
+ data = in.readInt();
+ }
+
+ @Override
+ public ShortestPathVertex getTag() {
+ return tag;
+ }
+
+ @Override
+ public Integer getData() {
+ return data;
+ }
+
+}
\ No newline at end of file
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1205819&r1=1205818&r2=1205819&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Thu Nov 24 13:20:15 2011
@@ -18,15 +18,22 @@
package org.apache.hama.examples;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
@@ -45,7 +52,7 @@ public class ShortestPaths extends
public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
public static final String START_VERTEX = "shortest.paths.start.vertex.name";
- private final HashMap<String, ShortestPathVertex> vertexLookupMap = new
HashMap<String, ShortestPathVertex>();
+ private final List<ShortestPathVertex> vertexLookup = new
ArrayList<ShortestPathVertex>();
private final HashMap<ShortestPathVertex, ShortestPathVertex[]>
adjacencyList = new HashMap<ShortestPathVertex, ShortestPathVertex[]>();
private String masterTask;
@@ -55,13 +62,15 @@ public class ShortestPaths extends
throws IOException, KeeperException, InterruptedException {
boolean updated = true;
while (updated) {
- int updatesMade = 0;
peer.sync();
- IntegerMessage msg = null;
+ int updatesMade = 0;
+ ShortestPathVertexMessage msg = null;
Deque<ShortestPathVertex> updatedQueue = new
LinkedList<ShortestPathVertex>();
- while ((msg = (IntegerMessage) peer.getCurrentMessage()) != null) {
- ShortestPathVertex vertex = vertexLookupMap.get(msg.getTag());
+ while ((msg = (ShortestPathVertexMessage) peer.getCurrentMessage()) !=
null) {
+ int index = Collections.binarySearch(vertexLookup, msg.getTag());
+ ShortestPathVertex vertex = vertexLookup.get(index);
+
// check if we need an distance update
if (vertex.getCost() > msg.getData()) {
updatesMade++;
@@ -69,7 +78,7 @@ public class ShortestPaths extends
vertex.setCost(msg.getData());
}
}
- // synchonize with all grooms if there were updates
+
updated = broadcastUpdatesMade(peer, updatesMade);
// send updates to the adjacents of the updated vertices
for (ShortestPathVertex vertex : updatedQueue) {
@@ -78,26 +87,28 @@ public class ShortestPaths extends
}
}
- @Override
public void setup(
BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer)
throws IOException, KeeperException, InterruptedException {
-
KeyValuePair<ShortestPathVertex, ShortestPathVertexArrayWritable> next =
null;
+ ShortestPathVertex startVertex = null;
+
while ((next = peer.readNext()) != null) {
+ if (next.getKey().getName().equals(
+ peer.getConfiguration().get(START_VERTEX))) {
+ next.getKey().setCost(0);
+ startVertex = next.getKey();
+ }
adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
.toArray());
- vertexLookupMap.put(next.getKey().getName(), next.getKey());
+ vertexLookup.add(next.getKey());
}
+ Collections.sort(vertexLookup);
masterTask = peer.getPeerName(0);
// initial message bypass
- ShortestPathVertex startVertex = vertexLookupMap.get(peer
- .getConfiguration().get(START_VERTEX));
-
if (startVertex != null) {
- startVertex.setCost(0);
sendMessageToNeighbors(peer, startVertex);
}
}
@@ -109,8 +120,10 @@ public class ShortestPaths extends
// write our map into hdfs
for (Entry<ShortestPathVertex, ShortestPathVertex[]> entry : adjacencyList
.entrySet()) {
- peer.write(new Text(entry.getKey().getName()), new IntWritable(entry
- .getKey().getCost()));
+ int cost = entry.getKey().getCost();
+ if (cost < Integer.MAX_VALUE) {
+ peer.write(new Text(entry.getKey().getName()), new IntWritable(cost));
+ }
}
}
@@ -166,14 +179,40 @@ public class ShortestPaths extends
BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text,
IntWritable> peer,
ShortestPathVertex id) throws IOException {
ShortestPathVertex[] outgoingEdges = adjacencyList.get(id);
+
for (ShortestPathVertex adjacent : outgoingEdges) {
- int mod = Math.abs((adjacent.hashCode() %
peer.getAllPeerNames().length));
- peer.send(peer.getPeerName(mod), new IntegerMessage(adjacent.getName(),
+ String target = peer.getPeerName(Math.abs((adjacent.hashCode() % peer
+ .getAllPeerNames().length)));
+
+ peer.send(target, new ShortestPathVertexMessage(adjacent,
id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
+ adjacent.getWeight()));
}
}
+ static void printOutput(Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+ for (FileStatus status : stati) {
+ if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+ Path path = status.getPath();
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+ Text key = new Text();
+ IntWritable value = new IntWritable();
+ int x = 0;
+ while (reader.next(key, value)) {
+ System.out.println(key.toString() + " | " + value.get());
+ x++;
+ if (x > 10) {
+ System.out.println("...");
+ break;
+ }
+ }
+ reader.close();
+ }
+ }
+ }
+
public static void printUsage() {
System.out.println("Usage: <startNode> <output path> <input path>");
}
@@ -206,10 +245,10 @@ public class ShortestPaths extends
long startTime = System.currentTimeMillis();
if (bsp.waitForCompletion(true)) {
+ printOutput(conf);
System.out.println("Job Finished in "
+ (double) (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
}
-
}