Author: edwardyoon
Date: Fri Sep 21 04:57:10 2012
New Revision: 1388324
URL: http://svn.apache.org/viewvc?rev=1388324&view=rev
Log:
merge to 1387508 revision
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/WritableComparator.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/WritableSerialization.java
hama/trunk/core/src/main/java/org/apache/hama/pipes/
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java
hama/trunk/jdbm/
Modified:
hama/trunk/CHANGES.txt
hama/trunk/bin/hama
hama/trunk/conf/hama-default.xml
hama/trunk/core/pom.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/graph/pom.xml
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/pom.xml
hama/trunk/src/assemble/bin.xml
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Sep 21 04:57:10 2012
@@ -4,8 +4,6 @@ Release 0.6 (unreleased changes)
NEW FEATURES
- HAMA-601: Hama Streaming (tjungblut)
-
BUG FIXES
HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu
via tjungblut)
@@ -14,8 +12,7 @@ Release 0.6 (unreleased changes)
HAMA-608: LocalRunner should honor the configured queues (tjungblut)
IMPROVEMENTS
-
- HAMA-642: Make GraphRunner disk based (tjungblut)
+
HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon &
tjungblut)
HAMA-557: Implement Checkpointing service in Hama (surajmenon)
HAMA-587: Synchronization Client should provide API's to store and retrieve
information among peers and BSPMaster (surajmenon)
@@ -51,34 +48,34 @@ Release 0.5 - April 10, 2012
IMPROVEMENTS
- HAMA-593: Improve RPC scalability (Mayank Mishra via tjungblut)
- HAMA-584: Change Pagerank IO format to human-readable text for easy debug
(tjungblut via edwardyoon)
- HAMA-590: Fix TestSubmitGraphJob tests (tjungblut)
- HAMA-582: Task's error logs should be displayed on client-end when job is
failed (edwardyoon)
- HAMA-580: Improve input of graph module (tjungblut)
- HAMA-579: Add multiple aggregators (tjungblut)
- HAMA-576: Improve sendMessages in Vertex (tjungblut)
- HAMA-575: Generify graph package (tjungblut)
- HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
- HAMA-521: Improve message buffering to save memory (Thomas Jungblut via
edwardyoon)
- HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
- HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
- HAMA-555: Separate bin and src distributions (edwardyoon)
- HAMA-548: Update 0.23.0-SNAPSHOT to 0.23.1 in pom file of yarn module
(edwardyoon)
- HAMA-545: Include the API and other docs in the Hama release (Suraj Menon
via edwardyoon)
- HAMA-543: Make best effort to start BSP Task on the host where the input
split is located. (Suraj Menon via edwardyoon)
- HAMA-527: Update commons-configuration version (edwardyoon)
- HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
- HAMA-485: Fill Counters with useful information (tjungblut)
- HAMA-497: Switch the trunk to Hadoop 1.0 based (edwardyoon)
- HAMA-445: Make configurable checkpointing (Suraj Menon via edwardyoon)
- HAMA-498: BSPTask should periodically ping its parent (Suraj Menon via
edwardyoon)
- HAMA-513: Move message classes to somewhere from bsp package. (tjungblut)
- HAMA-484: Counters should be accessible in client (tjungblut)
- HAMA-483: Remove old and deprecated BSP API (tjungblut)
- HAMA-514: Add maven-gpg-plugin to parent POM file (edwardyoon)
- HAMA-510: Add sendMessageToNeighbors() to Vertex (tjungblut)
- HAMA-502: Message API Improvement (edwardyoon)
+ HAMA-593: Improve RPC scalability (Mayank Mishra via tjungblut)
+ HAMA-584: Change Pagerank IO format to human-readable text for easy debug
(tjungblut via edwardyoon)
+ HAMA-590: Fix TestSubmitGraphJob tests (tjungblut)
+ HAMA-582: Task's error logs should be displayed on client-end when job is
failed (edwardyoon)
+ HAMA-580: Improve input of graph module (tjungblut)
+ HAMA-579: Add multiple aggregators (tjungblut)
+ HAMA-576: Improve sendMessages in Vertex (tjungblut)
+ HAMA-575: Generify graph package (tjungblut)
+ HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
+ HAMA-521: Improve message buffering to save memory (Thomas Jungblut via
edwardyoon)
+ HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
+ HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
+ HAMA-555: Separate bin and src distributions (edwardyoon)
+ HAMA-548: Update 0.23.0-SNAPSHOT to 0.23.1 in pom file of yarn module
(edwardyoon)
+ HAMA-545: Include the API and other docs in the Hama release (Suraj Menon
via edwardyoon)
+ HAMA-543: Make best effort to start BSP Task on the host where the input
split is located. (Suraj Menon via edwardyoon)
+ HAMA-527: Update commons-configuration version (edwardyoon)
+ HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
+ HAMA-485: Fill Counters with useful information (tjungblut)
+ HAMA-497: Switch the trunk to Hadoop 1.0 based (edwardyoon)
+ HAMA-445: Make configurable checkpointing (Suraj Menon via edwardyoon)
+ HAMA-498: BSPTask should periodically ping its parent (Suraj Menon via
edwardyoon)
+ HAMA-513: Move message classes to somewhere from bsp package. (tjungblut)
+ HAMA-484: Counters should be accessible in client (tjungblut)
+ HAMA-483: Remove old and deprecated BSP API (tjungblut)
+ HAMA-514: Add maven-gpg-plugin to parent POM file (edwardyoon)
+ HAMA-510: Add sendMessageToNeighbors() to Vertex (tjungblut)
+ HAMA-502: Message API Improvement (edwardyoon)
Release 0.4 - February 5, 2012
Modified: hama/trunk/bin/hama
URL:
http://svn.apache.org/viewvc/hama/trunk/bin/hama?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/bin/hama (original)
+++ hama/trunk/bin/hama Fri Sep 21 04:57:10 2012
@@ -60,7 +60,6 @@ if [ $# = 0 ]; then
echo " zookeeper run a Zookeeper server"
echo " job manipulate BSP jobs"
echo " jar <jar> run a jar file"
- echo " pipes run a pipe job"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
@@ -161,8 +160,6 @@ elif [ "$COMMAND" = "zookeeper" ] ; then
CLASS='org.apache.hama.ZooKeeperRunner'
elif [ "$COMMAND" = "job" ] ; then
CLASS='org.apache.hama.bsp.BSPJobClient'
-elif [ "$COMMAND" = "pipes" ] ; then
- CLASS='org.apache.hama.pipes.Submitter'
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hama.util.RunJar
BSP_OPTS="$BSP_OPTS"
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Fri Sep 21 04:57:10 2012
@@ -140,6 +140,7 @@
<value>10000</value>
<description>The default timeout period for checking groom server
health.</description>
</property>
+
<property>
<name>hama.messenger.max.cached.connections</name>
<value>100</value>
@@ -149,20 +150,7 @@
but it trades more memory.
</description>
</property>
- <property>
- <name>hama.graph.in.memory</name>
- <value>false</value>
- <description>true if the graph should completely stored in memory (in a
map),
- default is false, so it will be stored on disk under the configured
"hama.graph.storage.path".
- </description>
- </property>
- <property>
- <name>hama.graph.storage.path</name>
- <value>/tmp/graph_storage/</value>
- <description>The default place where the graph (vertices, edges etc) is
stored on each node's hdd.
- </description>
- </property>
-
+
<!--
Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.
All properties with an "hama.zookeeper.property." prefix are converted for
Modified: hama/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Fri Sep 21 04:57:10 2012
@@ -150,11 +150,6 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.hama</groupId>
- <artifactId>hama-jdbm</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Fri
Sep 21 04:57:10 2012
@@ -22,6 +22,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@@ -39,7 +40,7 @@ public class BSPMessageBundle<M extends
public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
- private HashMap<String, ArrayList<M>> messages = new HashMap<String,
ArrayList<M>>();
+ private HashMap<String, LinkedList<M>> messages = new HashMap<String,
LinkedList<M>>();
private HashMap<String, Class<M>> classCache = new HashMap<String,
Class<M>>();
public BSPMessageBundle() {
@@ -53,7 +54,8 @@ public class BSPMessageBundle<M extends
public void addMessage(M message) {
String className = message.getClass().getName();
if (!messages.containsKey(className)) {
- ArrayList<M> list = new ArrayList<M>();
+ // use linked list because we're just iterating over them
+ LinkedList<M> list = new LinkedList<M>();
list.add(message);
messages.put(className, list);
} else {
@@ -65,7 +67,7 @@ public class BSPMessageBundle<M extends
// here we use an arraylist, because we know the size and outside may need
// random access
List<M> mergeList = new ArrayList<M>(messages.size());
- for (ArrayList<M> c : messages.values()) {
+ for (LinkedList<M> c : messages.values()) {
mergeList.addAll(c);
}
return mergeList;
@@ -76,9 +78,9 @@ public class BSPMessageBundle<M extends
// writes the k/v mapping size
out.writeInt(messages.size());
if (messages.size() > 0) {
- for (Entry<String, ArrayList<M>> entry : messages.entrySet()) {
+ for (Entry<String, LinkedList<M>> entry : messages.entrySet()) {
out.writeUTF(entry.getKey());
- ArrayList<M> messageList = entry.getValue();
+ LinkedList<M> messageList = entry.getValue();
out.writeInt(messageList.size());
for (M msg : messageList) {
msg.write(out);
@@ -91,14 +93,14 @@ public class BSPMessageBundle<M extends
@SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
if (messages == null) {
- messages = new HashMap<String, ArrayList<M>>();
+ messages = new HashMap<String, LinkedList<M>>();
}
int numMessages = in.readInt();
if (numMessages > 0) {
for (int entries = 0; entries < numMessages; entries++) {
String className = in.readUTF();
int size = in.readInt();
- ArrayList<M> msgList = new ArrayList<M>();
+ LinkedList<M> msgList = new LinkedList<M>();
messages.put(className, msgList);
Class<M> clazz = null;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Sep
21 04:57:10 2012
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map.Entry;
@@ -277,7 +276,6 @@ public final class BSPPeerImpl<K1, V1, K
}
}
}
- LOG.info("Moving to local cache files: " + files.toString() +" INITIALLY
IT WAS: " + Arrays.toString(DistributedCache.getCacheFiles(conf)));
if (files.length() > 0) {
DistributedCache.addLocalFiles(conf, files.toString());
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri
Sep 21 04:57:10 2012
@@ -57,10 +57,6 @@ import org.apache.hama.ipc.JobSubmission
* running BSP's.
*/
public class LocalBSPRunner implements JobSubmissionProtocol {
-
- public static final String BSP_LOCAL_DIR = "bsp.local.dir";
- public static final String BSP_LOCAL_TASKS_MAXIMUM =
"bsp.local.tasks.maximum";
-
private static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
private static final String IDENTIFIER = "localrunner";
@@ -86,9 +82,9 @@ public class LocalBSPRunner implements J
super();
this.conf = conf;
- maxTasks = conf.getInt(BSP_LOCAL_TASKS_MAXIMUM, 20);
+ maxTasks = conf.getInt("bsp.local.tasks.maximum", 20);
- String path = conf.get(BSP_LOCAL_DIR);
+ String path = conf.get("bsp.local.dir");
if (path != null && !path.isEmpty()) {
WORKING_DIR = path;
}
Modified: hama/trunk/graph/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/pom.xml (original)
+++ hama/trunk/graph/pom.xml Fri Sep 21 04:57:10 2012
@@ -43,11 +43,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hama</groupId>
- <artifactId>hama-jdbm</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
<finalName>hama-graph-${project.version}</finalName>
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
Fri Sep 21 04:57:10 2012
@@ -87,7 +87,7 @@ public abstract class AbstractAggregator
public IntWritable getTimesAggregated() {
return new IntWritable(timesAggregated);
}
-
+
@Override
public String toString() {
return "VAL=" + getValue();
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Fri Sep
21 04:57:10 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPJob;
import org.apache.hama.bsp.Combiner;
@@ -73,8 +72,7 @@ public class GraphJob extends BSPJob {
/**
* Set the Vertex ID class for the job.
*/
- public void setVertexIDClass(
- @SuppressWarnings("rawtypes") Class<? extends WritableComparable> cls)
+ public void setVertexIDClass(Class<? extends Writable> cls)
throws IllegalStateException {
conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
}
@@ -131,8 +129,8 @@ public class GraphJob extends BSPJob {
}
@Override
- public void setPartitioner(
- @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
+ public void setPartitioner(@SuppressWarnings("rawtypes")
+ Class<? extends Partitioner> theClass) {
super.setPartitioner(theClass);
conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Fri Sep 21 04:57:10 2012
@@ -18,10 +18,8 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,26 +29,18 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
-import org.apache.hama.bsp.WritableComparator;
-import org.apache.hama.bsp.WritableSerialization;
import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.jdbm.DB;
-import org.apache.hama.jdbm.DBMaker;
import org.apache.hama.util.KeyValuePair;
/**
@@ -60,20 +50,8 @@ import org.apache.hama.util.KeyValuePair
* @param <E> the value type of an edge.
* @param <M> the value type of a vertex.
*/
-public final class GraphJobRunner<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
- extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage>
- implements Serializable {
-
- public static final String HAMA_GRAPH_MULTI_STEP_PARTITIONING_INTERVAL =
"hama.graph.partitioning.batch.bytes";
- public static final String HAMA_GRAPH_SELF_REF = "hama.graph.self.ref";
- public static final String HAMA_GRAPH_STORAGE_PATH =
"hama.graph.storage.path";
- public static final String HAMA_GRAPH_IN_MEMORY = "hama.graph.in.memory";
- public static final String HAMA_GRAPH_VERTEX_CLASS =
"hama.graph.vertex.class";
- public static final String HAMA_GRAPH_MAX_ITERATION =
"hama.graph.max.iteration";
- public static final String MESSAGE_COMBINER_CLASS =
"hama.vertex.message.combiner.class";
- public static final String GRAPH_REPAIR = "hama.graph.repair";
-
- private static final long serialVersionUID = 1L;
+public final class GraphJobRunner<V extends Writable, E extends Writable, M
extends Writable>
+ extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
@@ -83,12 +61,14 @@ public final class GraphJobRunner<V exte
public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
- private transient Configuration conf;
- private transient Combiner<M> combiner;
- private transient Partitioner<V, M> partitioner;
+ public static final String MESSAGE_COMBINER_CLASS =
"hama.vertex.message.combiner.class";
+ public static final String GRAPH_REPAIR = "hama.graph.repair";
+
+ private Configuration conf;
+ private Combiner<M> combiner;
+ private Partitioner<V, M> partitioner;
- private transient Map<V, Vertex<V, E, M>> vertices;
- private transient DB db;
+ private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
private boolean updated = true;
private int globalUpdateCounts = 0;
@@ -98,14 +78,14 @@ public final class GraphJobRunner<V exte
private int maxIteration = -1;
private long iteration;
- Class<V> vertexIdClass;
- Class<M> vertexValueClass;
- Class<E> edgeValueClass;
- Class<Vertex<V, E, M>> vertexClass;
+ private Class<V> vertexIdClass;
+ private Class<M> vertexValueClass;
+ private Class<E> edgeValueClass;
+ private Class<Vertex<V, E, M>> vertexClass;
- private transient AggregationRunner<V, E, M> aggregationRunner;
+ private AggregationRunner<V, E, M> aggregationRunner;
- private transient BSPPeer<Writable, Writable, Writable, Writable,
GraphJobMessage> peer;
+ private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
@Override
public final void setup(
@@ -145,32 +125,18 @@ public final class GraphJobRunner<V exte
// loop over vertices and do their computation
doSuperstep(messages, peer);
}
-
- write(peer);
}
/**
* Just write <ID as Writable, Value as Writable> pair as a result. Note that
* this will also be executed when failure happened.
*/
- private void write(
- BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
- throws IOException {
-
- Set<V> keySet = vertices.keySet();
- for (V value : keySet) {
- Vertex<V, E, M> e = vertices.get(value);
- peer.write(e.getVertexID(), e.getValue());
- }
- }
-
@Override
public final void cleanup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- // remove the DB files if they exist
- if (db != null) {
- db.close();
+ for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
+ peer.write(e.getValue().getVertexID(), e.getValue().getValue());
}
}
@@ -205,9 +171,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
- Set<V> keySet = vertices.keySet();
- for (V key : keySet) {
- Vertex<V, E, M> vertex = vertices.get(key);
+ for (Vertex<V, E, M> vertex : vertices.values()) {
List<M> msgs = messages.get(vertex.getVertexID());
// If there are newly received messages, restart.
if (vertex.isHalted() && msgs != null) {
@@ -243,9 +207,7 @@ public final class GraphJobRunner<V exte
private void doInitialSuperstep(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- Set<V> keySet = vertices.keySet();
- for (V key : keySet) {
- Vertex<V, E, M> vertex = vertices.get(key);
+ for (Vertex<V, E, M> vertex : vertices.values()) {
List<M> singletonList = Collections.singletonList(vertex.getValue());
M lastValue = vertex.getValue();
vertex.compute(singletonList.iterator());
@@ -260,7 +222,8 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
this.peer = peer;
this.conf = peer.getConfiguration();
- maxIteration = peer.getConfiguration().getInt(HAMA_GRAPH_MAX_ITERATION,
-1);
+ maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
+ -1);
vertexIdClass = (Class<V>) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR,
Text.class, Writable.class);
@@ -270,7 +233,7 @@ public final class GraphJobRunner<V exte
GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
Writable.class);
vertexClass = (Class<Vertex<V, E, M>>) conf.getClass(
- HAMA_GRAPH_VERTEX_CLASS, Vertex.class);
+ "hama.graph.vertex.class", Vertex.class);
// set the classes statically, so we can save memory per message
GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
@@ -291,35 +254,6 @@ public final class GraphJobRunner<V exte
conf);
}
- if (!conf.getBoolean(HAMA_GRAPH_IN_MEMORY, false)) {
-
- String storagePath = conf.get(HAMA_GRAPH_STORAGE_PATH);
- if (storagePath == null) {
- storagePath = "/tmp/graph_storage/";
- }
-
- storagePath += peer.getTaskId().toString();
-
- try {
- LocalFileSystem local = FileSystem.getLocal(conf);
- local.mkdirs(new Path(storagePath));
- } catch (IOException e) {
- throw new RuntimeException("Could not create \"" + storagePath
- + "\", nested exception was: ", e);
- }
-
- db = DBMaker.openFile(storagePath + "/graph.db").disableLocking()
- .disableTransactions().deleteFilesAfterClose().useRandomAccessFile()
- .make();
-
- Comparator<V> writableComparator = new WritableComparator<V>();
- vertices = db.createTreeMap("graph-db", writableComparator,
- new WritableSerialization<V>(vertexIdClass, peer.getConfiguration()),
- new VertexWritableSerialization<Vertex<V, E, M>>(vertexClass, this));
- } else {
- vertices = new HashMap<V, Vertex<V, E, M>>();
- }
-
aggregationRunner = new AggregationRunner<V, E, M>();
aggregationRunner.setupAggregators(peer);
}
@@ -348,7 +282,7 @@ public final class GraphJobRunner<V exte
final int partitioningSteps = partitionMultiSteps(peer, splitSize);
final long interval = splitSize / partitioningSteps;
- final boolean selfReference = conf.getBoolean(HAMA_GRAPH_SELF_REF, false);
+ final boolean selfReference = conf.getBoolean("hama.graph.self.ref",
false);
/*
* Several partitioning constants end
@@ -569,7 +503,7 @@ public final class GraphJobRunner<V exte
}
int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
- HAMA_GRAPH_MULTI_STEP_PARTITIONING_INTERVAL, 20000000)) + 1;
+ "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
for (String peerName : peer.getAllPeerNames()) {
MapWritable temp = new MapWritable();
@@ -584,7 +518,7 @@ public final class GraphJobRunner<V exte
for (Entry<Writable, Writable> e : x.entrySet()) {
multiSteps = ((IntWritable) e.getValue()).get();
}
- LOG.info(peer.getPeerName() + ": Number of partitioning supersteps: " +
multiSteps);
+ LOG.info(peer.getPeerName() + ": " + multiSteps);
return multiSteps;
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Sep 21
04:57:10 2012
@@ -17,8 +17,6 @@
*/
package org.apache.hama.graph;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -26,15 +24,13 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Partitioner;
public abstract class Vertex<V extends Writable, E extends Writable, M extends
Writable>
- implements VertexInterface<V, E, M>, Writable {
+ implements VertexInterface<V, E, M> {
- transient GraphJobRunner<?, ?, ?> runner;
+ GraphJobRunner<?, ?, ?> runner;
private V vertexID;
private M value;
@@ -222,81 +218,6 @@ public abstract class Vertex<V extends W
return true;
}
- @SuppressWarnings("unchecked")
- @Override
- public void readFields(DataInput in) throws IOException {
- votedToHalt = in.readBoolean();
- vertexID = (V) ReflectionUtils.newInstance(runner.vertexIdClass, null);
- vertexID.readFields(in);
- if (in.readBoolean()) {
- value = (M) ReflectionUtils.newInstance(runner.vertexValueClass, null);
- value.readFields(in);
- }
-
- int edges = WritableUtils.readVInt(in);
- ArrayList<Edge<V, E>> list = new ArrayList<Edge<V, E>>(edges);
- for (int i = 0; i < edges; i++) {
- V adjacentId = (V) ReflectionUtils
- .newInstance(runner.vertexIdClass, null);
- adjacentId.readFields(in);
- E edgeValue = null;
- if (in.readBoolean()) {
- edgeValue = (E) ReflectionUtils
- .newInstance(runner.edgeValueClass, null);
- edgeValue.readFields(in);
- }
- list.add(new Edge<V, E>(adjacentId, edgeValue));
- }
-
- this.setEdges(list);
- readInternal(in);
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(votedToHalt);
- V vId = getVertexID();
- vId.write(out);
- M val = getValue();
- serializeNull(out, val);
-
- List<Edge<V, E>> edges = getEdges();
- int length = edges == null ? 0 : edges.size();
- WritableUtils.writeVInt(out, length);
- for (Edge<V, E> edge : edges) {
- edge.getDestinationVertexID().write(out);
- serializeNull(out, edge.getValue());
- }
-
- writeInternal(out);
- }
-
- /**
- * A write method to let the user save its own state in the vertex class.
- */
- protected void writeInternal(DataOutput out) throws IOException {
- }
-
- /**
- * A read method to let the user save its own state in the vertex class.
- */
- protected void readInternal(DataInput out) throws IOException {
- }
-
- /**
- * Serializes data null-safe by writing a boolean that is only true when the
- * given writable is not null.
- */
- protected static void serializeNull(DataOutput out, Writable writable)
- throws IOException {
- if (writable == null) {
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- writable.write(out);
- }
- }
-
@Override
public String toString() {
return getVertexID() + (getValue() != null ? " = " + getValue() : "")
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Fri Sep 21 04:57:10 2012
@@ -248,7 +248,6 @@
<module>examples</module>
<module>yarn</module>
<module>ml</module>
- <module>jdbm</module>
<module>dist</module>
</modules>
@@ -322,8 +321,6 @@
<exclude>.classpath/**</exclude>
<exclude>.project</exclude>
<exclude>**/*.asc</exclude>
- <exclude>**/target/*</exclude>
- <exclude>**/bin/**</exclude>
<exclude>**/logs/**</exclude>
<exclude>**/docs/**</exclude>
<exclude>CHANGES.txt</exclude>
Modified: hama/trunk/src/assemble/bin.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/src/assemble/bin.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/src/assemble/bin.xml (original)
+++ hama/trunk/src/assemble/bin.xml Fri Sep 21 04:57:10 2012
@@ -74,19 +74,6 @@
</excludes>
<outputDirectory>../hama-${project.version}/</outputDirectory>
</fileSet>
- <fileSet>
- <directory>../jdbm/target</directory>
- <includes>
- <include>hama-*.jar</include>
- </includes>
- <excludes>
- <exclude>*sources.jar</exclude>
- <exclude>*tests.jar</exclude>
- <exclude>*javadoc.jar</exclude>
- </excludes>
- <outputDirectory>../hama-${project.version}/</outputDirectory>
- </fileSet>
-
<fileSet>
<directory>../</directory>