Author: edwardyoon
Date: Fri Jan 3 07:03:58 2014
New Revision: 1555020
URL: http://svn.apache.org/r1555020
Log:
HAMA-783: Efficient InMemory Storage for Vertices
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Fri
Jan 3 07:03:58 2014
@@ -69,7 +69,6 @@ public class SSSP {
}
voteToHalt();
}
-
}
public static class MinIntCombiner extends Combiner<IntWritable> {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
Fri Jan 3 07:03:58 2014
@@ -17,12 +17,13 @@
*/
package org.apache.hama.graph;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -31,11 +32,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.IDSkippingIterator.Strategy;
-import static com.google.common.base.Preconditions.checkArgument;
-
@SuppressWarnings("rawtypes")
public final class DiskVerticesInfo<V extends WritableComparable, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
@@ -67,12 +67,12 @@ public final class DiskVerticesInfo<V ex
private Vertex<V, E, M> cachedVertexInstance;
private int currentStep = 0;
private int index = 0;
- private Configuration conf;
+ private HamaConfiguration conf;
private GraphJobRunner<V, E, M> runner;
private String staticFile;
@Override
- public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+ public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
this.runner = runner;
this.conf = conf;
@@ -92,7 +92,7 @@ public final class DiskVerticesInfo<V ex
}
@Override
- public void cleanup(Configuration conf, TaskAttemptID attempt)
+ public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
throws IOException {
IOUtils.cleanup(null, softGraphPartsDos, softGraphPartsNextIterationDos,
staticGraphPartsDis, softGraphPartsDis);
@@ -122,7 +122,7 @@ public final class DiskVerticesInfo<V ex
@Override
public void removeVertex(V vertexID) {
- throw new UnsupportedOperationException ("Not yet implemented");
+ throw new UnsupportedOperationException("Not yet implemented");
}
/**
@@ -176,7 +176,7 @@ public final class DiskVerticesInfo<V ex
@Override
public void finishRemovals() {
- throw new UnsupportedOperationException ("Not yet implemented");
+ throw new UnsupportedOperationException("Not yet implemented");
}
private static long[] copy(ArrayList<Long> lst) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Fri Jan 3 07:03:58 2014
@@ -248,12 +248,14 @@ public final class GraphJobRunner<V exte
int activeVertices = 0;
this.changedVertexCnt = 0;
vertices.startSuperstep();
+
/*
* We iterate over our messages and vertices in sorted order. That means
* that we need to seek the first vertex that has the same ID as the
* currentMessage or the first vertex that is active.
*/
IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
+
// note that can't skip inactive vertices because we have to rewrite the
// complete vertex file in each iteration
while (iterator.hasNext(
@@ -266,9 +268,11 @@ public final class GraphJobRunner<V exte
iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
vertex, peer);
}
+
if (iterable != null && vertex.isHalted()) {
vertex.setActive();
}
+
if (!vertex.isHalted()) {
M lastValue = vertex.getValue();
if (iterable == null) {
@@ -285,7 +289,7 @@ public final class GraphJobRunner<V exte
getAggregationRunner().aggregateVertex(lastValue, vertex);
activeVertices++;
}
-
+
// note that we even need to rewrite the vertex if it is halted for
// consistency reasons
vertices.finishVertexComputation(vertex);
@@ -352,7 +356,10 @@ public final class GraphJobRunner<V exte
IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
while (skippingIterator.hasNext()) {
Vertex<V, E, M> vertex = skippingIterator.next();
+
M lastValue = vertex.getValue();
+ // Calls setup method.
+ vertex.setup(conf);
vertex.compute(Collections.singleton(vertex.getValue()));
getAggregationRunner().aggregateVertex(lastValue, vertex);
vertices.finishVertexComputation(vertex);
@@ -456,9 +463,6 @@ public final class GraphJobRunner<V exte
vertex.addEdge(edge);
}
} else {
- vertex.setRunner(this);
- vertex.setup(conf);
-
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
@@ -469,8 +473,6 @@ public final class GraphJobRunner<V exte
}
}
// add last vertex.
- vertex.setRunner(this);
- vertex.setup(conf);
if (selfReference) {
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
Fri Jan 3 07:03:58 2014
@@ -17,6 +17,8 @@
*/
package org.apache.hama.graph;
+import java.io.IOException;
+
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -54,8 +56,9 @@ public abstract class IDSkippingIterator
* Skips nothing, accepts everything.
*
* @return true if the strategy found a new item, false if not.
+ * @throws IOException
*/
- public boolean hasNext() {
+ public boolean hasNext() throws IOException {
return hasNext(null, Strategy.ALL);
}
@@ -63,8 +66,9 @@ public abstract class IDSkippingIterator
* Skips until the given strategy is satisfied.
*
* @return true if the strategy found a new item, false if not.
+ * @throws IOException
*/
- public abstract boolean hasNext(V e, Strategy strat);
+ public abstract boolean hasNext(V e, Strategy strat) throws IOException;
/**
* @return a found vertex that can be read safely.
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
Fri Jan 3 07:03:58 2014
@@ -17,14 +17,18 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.Map;
+import java.util.TreeMap;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
/**
@@ -36,52 +40,69 @@ import org.apache.hama.bsp.TaskAttemptID
*/
public final class ListVerticesInfo<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
+ private GraphJobRunner<V, E, M> runner;
+ Vertex<V, E, M> v;
- private final SortedSet<Vertex<V, E, M>> vertices = new TreeSet<Vertex<V, E,
M>>();
- // We will use this variable to make vertex removals, so we don't invoke GC
too many times.
- private final Vertex<V, E, M> vertexTemplate = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ private final Map<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+
+ private ByteArrayOutputStream bos = null;
+ private DataOutputStream dos = null;
+ private ByteArrayInputStream bis = null;
+ private DataInputStream dis = null;
+
+ @Override
+ public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
+ TaskAttemptID attempt) throws IOException {
+ this.runner = runner;
+ }
@Override
- public void addVertex(Vertex<V, E, M> vertex) {
- if (!vertices.add(vertex)) {
- throw new UnsupportedOperationException("Vertex with ID: " +
vertex.getVertexID() + " already exists!");
+ public void addVertex(Vertex<V, E, M> vertex) throws IOException {
+ if (verticesMap.containsKey(vertex.getVertexID())) {
+ throw new UnsupportedOperationException("Vertex with ID: "
+ + vertex.getVertexID() + " already exists!");
+ } else {
+ verticesMap.put(vertex.getVertexID(), serialize(vertex));
}
}
@Override
public void removeVertex(V vertexID) throws UnsupportedOperationException {
- vertexTemplate.setVertexID(vertexID);
-
- if (!vertices.remove(vertexTemplate)) {
- throw new UnsupportedOperationException("Vertex with ID: " + vertexID +
" not found on this peer.");
+ if (verticesMap.containsKey(vertexID)) {
+ verticesMap.remove(vertexID);
+ } else {
+ throw new UnsupportedOperationException("Vertex with ID: " + vertexID
+ + " not found on this peer.");
}
}
public void clear() {
- vertices.clear();
+ verticesMap.clear();
}
@Override
public int size() {
- return this.vertices.size();
+ return this.verticesMap.size();
}
@Override
public IDSkippingIterator<V, E, M> skippingIterator() {
return new IDSkippingIterator<V, E, M>() {
- Iterator<Vertex<V, E, M>> it = vertices.iterator();
- Vertex<V, E, M> v;
+ Iterator<V> it = verticesMap.keySet().iterator();
@Override
public boolean hasNext(V msgId,
- org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+ org.apache.hama.graph.IDSkippingIterator.Strategy strat)
+ throws IOException {
if (it.hasNext()) {
- v = it.next();
+ V vertexKey = it.next();
+ v = deserialize(verticesMap.get(vertexKey));
while (!strat.accept(v, msgId)) {
if (it.hasNext()) {
- v = it.next();
+ vertexKey = it.next();
+ v = deserialize(verticesMap.get(vertexKey));
} else {
return false;
}
@@ -97,7 +118,8 @@ public final class ListVerticesInfo<V ex
@Override
public Vertex<V, E, M> next() {
if (v == null) {
- throw new UnsupportedOperationException("You must invoke hasNext
before ask for the next vertex.");
+ throw new UnsupportedOperationException(
+ "You must invoke hasNext before ask for the next vertex.");
}
Vertex<V, E, M> tmp = v;
@@ -108,9 +130,27 @@ public final class ListVerticesInfo<V ex
};
}
- @Override
- public void finishVertexComputation(Vertex<V, E, M> vertex) {
+ public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ vertex.write(dos);
+ return bos.toByteArray();
+ }
+
+ public Vertex<V, E, M> deserialize(byte[] serialized) throws IOException {
+ bis = new ByteArrayInputStream(serialized);
+ dis = new DataInputStream(bis);
+ v = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ v.readFields(dis);
+ v.setRunner(runner);
+ return v;
+ }
+
+ @Override
+ public void finishVertexComputation(Vertex<V, E, M> vertex)
+ throws IOException {
+ verticesMap.put(vertex.getVertexID(), serialize(vertex));
}
@Override
@@ -122,13 +162,13 @@ public final class ListVerticesInfo<V ex
public void finishRemovals() {
}
- @Override
+ @Override
public void finishSuperstep() {
}
@Override
- public void cleanup(Configuration conf, TaskAttemptID attempt)
+ public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
throws IOException {
}
@@ -137,11 +177,4 @@ public final class ListVerticesInfo<V ex
public void startSuperstep() throws IOException {
}
-
- @Override
- public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
- TaskAttemptID attempt) throws IOException {
-
- }
-
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
Fri Jan 3 07:03:58 2014
@@ -27,9 +27,9 @@ import org.apache.directmemory.memory.Po
import org.apache.directmemory.serialization.Serializer;
import org.apache.directmemory.serialization.kryo.KryoSerializer;
import org.apache.directmemory.utils.CacheValuesIterable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.util.ReflectionUtils;
@@ -37,123 +37,128 @@ import org.apache.hama.util.ReflectionUt
* An off heap version of a {@link org.apache.hama.graph.Vertex} storage.
*/
public class OffHeapVerticesInfo<V extends WritableComparable<?>, E extends
Writable, M extends Writable>
- implements VerticesInfo<V, E, M> {
+ implements VerticesInfo<V, E, M> {
- public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
- public static final String DM_BUFFERS = "dm.buffers";
- public static final String DM_SIZE = "dm.size";
- public static final String DM_CAPACITY = "dm.capacity";
- public static final String DM_CONCURRENCY = "dm.concurrency";
- public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
- public static final String DM_SERIALIZER = "dm.serializer";
- public static final String DM_SORTED = "dm.sorted";
-
- private CacheService<V, Vertex<V, E, M>> vertices;
-
- private boolean strict;
- private GraphJobRunner<V, E, M> runner;
-
- @Override
- public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
TaskAttemptID attempt) throws IOException {
- this.runner = runner;
- this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
- DirectMemory<V, Vertex<V, E, M>> dm = new DirectMemory<V, Vertex<V, E,
M>>()
- .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100))
- .setSize(conf.getInt(DM_SIZE, 102400))
-
.setSerializer(ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER,
KryoSerializer.class, Serializer.class)))
- .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
- if (conf.getBoolean(DM_SORTED, true)) {
- dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E,
M>>>());
+ public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
+ public static final String DM_BUFFERS = "dm.buffers";
+ public static final String DM_SIZE = "dm.size";
+ public static final String DM_CAPACITY = "dm.capacity";
+ public static final String DM_CONCURRENCY = "dm.concurrency";
+ public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
+ public static final String DM_SERIALIZER = "dm.serializer";
+ public static final String DM_SORTED = "dm.sorted";
+
+ private CacheService<V, Vertex<V, E, M>> vertices;
+
+ private boolean strict;
+ private GraphJobRunner<V, E, M> runner;
+
+ @Override
+ public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
+ TaskAttemptID attempt) throws IOException {
+ this.runner = runner;
+ this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
+ DirectMemory<V, Vertex<V, E, M>> dm = new DirectMemory<V, Vertex<V, E,
M>>()
+ .setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100))
+ .setSize(conf.getInt(DM_SIZE, 102400))
+ .setSerializer(
+ ReflectionUtils.newInstance(conf.getClass(DM_SERIALIZER,
+ KryoSerializer.class, Serializer.class)))
+ .setDisposalTime(conf.getInt(DM_DISPOSAL_TIME, 3600000));
+ if (conf.getBoolean(DM_SORTED, true)) {
+ dm.setMap(new ConcurrentSkipListMap<V, Pointer<Vertex<V, E, M>>>());
+ } else {
+ dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
+ .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
+ }
+
+ this.vertices = dm.newCacheService();
+
+ }
+
+ @Override
+ public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
+ throws IOException {
+ vertices.dump();
+ }
+
+ public void addVertex(Vertex<V, E, M> vertex) {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+
+ @Override
+ public void finishAdditions() {
+ }
+
+ @Override
+ public void startSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishSuperstep() throws IOException {
+ }
+
+ @Override
+ public void finishVertexComputation(Vertex<V, E, M> vertex)
+ throws IOException {
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+
+ public void clear() {
+ vertices.clear();
+ }
+
+ public int size() {
+ return (int) this.vertices.entries();
+ }
+
+ @Override
+ public IDSkippingIterator<V, E, M> skippingIterator() {
+ final Iterator<Vertex<V, E, M>> vertexIterator = new
CacheValuesIterable<V, Vertex<V, E, M>>(
+ vertices, strict).iterator();
+
+ return new IDSkippingIterator<V, E, M>() {
+ int currentIndex = 0;
+
+ Vertex<V, E, M> currentVertex = null;
+
+ @Override
+ public boolean hasNext(V e,
+ org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+ if (currentIndex < vertices.entries()) {
+
+ Vertex<V, E, M> next = vertexIterator.next();
+ while (!strat.accept(next, e)) {
+ currentIndex++;
+ }
+ currentVertex = next;
+ return true;
} else {
- dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000))
- .setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
+ return false;
}
+ }
- this.vertices = dm.newCacheService();
-
- }
-
- @Override
- public void cleanup(Configuration conf, TaskAttemptID attempt) throws
IOException {
- vertices.dump();
- }
-
- public void addVertex(Vertex<V, E, M> vertex) {
- vertices.put(vertex.getVertexID(), vertex);
- }
-
- @Override
- public void finishAdditions() {
- }
-
- @Override
- public void startSuperstep() throws IOException {
- }
-
- @Override
- public void finishSuperstep() throws IOException {
- }
-
- @Override
- public void finishVertexComputation(Vertex<V, E, M> vertex) throws
IOException {
- vertices.put(vertex.getVertexID(), vertex);
- }
-
- public void clear() {
- vertices.clear();
- }
-
- public int size() {
- return (int) this.vertices.entries();
- }
-
- @Override
- public IDSkippingIterator<V, E, M> skippingIterator() {
- final Iterator<Vertex<V, E, M>> vertexIterator =
- new CacheValuesIterable<V, Vertex<V, E, M>>(vertices,
strict).iterator();
-
- return new IDSkippingIterator<V, E, M>() {
- int currentIndex = 0;
-
- Vertex<V, E, M> currentVertex = null;
-
- @Override
- public boolean hasNext(V e,
-
org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
- if (currentIndex < vertices.entries()) {
-
- Vertex<V, E, M> next = vertexIterator.next();
- while (!strat.accept(next, e)) {
- currentIndex++;
- }
- currentVertex = next;
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public Vertex<V, E, M> next() {
- currentIndex++;
- if (currentVertex.getRunner() == null) {
- currentVertex.setRunner(runner);
- }
- return currentVertex;
- }
-
- };
+ @Override
+ public Vertex<V, E, M> next() {
+ currentIndex++;
+ if (currentVertex.getRunner() == null) {
+ currentVertex.setRunner(runner);
+ }
+ return currentVertex;
+ }
- }
+ };
- @Override
- public void removeVertex(V vertexID) {
- throw new UnsupportedOperationException ("Not yet implemented");
- }
+ }
- @Override
- public void finishRemovals() {
- throw new UnsupportedOperationException ("Not yet implemented");
- }
+ @Override
+ public void removeVertex(V vertexID) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ @Override
+ public void finishRemovals() {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Jan 3
07:03:58 2014
@@ -71,7 +71,7 @@ public abstract class Vertex<V extends W
@Override
public void setup(HamaConfiguration conf) {
}
-
+
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
runner.getPeer().send(getDestinationPeerName(e),
@@ -172,7 +172,7 @@ public abstract class Vertex<V extends W
@Override
public M getValue() {
- return value;
+ return this.value;
}
@Override
@@ -306,10 +306,11 @@ public abstract class Vertex<V extends W
}
if (in.readBoolean()) {
if (this.value == null) {
- value = GraphJobRunner.createVertexValue();
+ this.value = GraphJobRunner.createVertexValue();
}
- value.readFields(in);
+ this.value.readFields(in);
}
+
this.edges = new ArrayList<Edge<V, E>>();
if (in.readBoolean()) {
int num = in.readInt();
@@ -340,6 +341,7 @@ public abstract class Vertex<V extends W
out.writeBoolean(true);
vertexID.write(out);
}
+
if (value == null) {
out.writeBoolean(false);
} else {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Fri Jan 3 07:03:58 2014
@@ -38,7 +38,10 @@ public interface VertexInterface<V exten
extends WritableComparable<VertexInterface<V, E, M>> {
/**
- * Used to setup a vertex.
+ * This method is called once before the Vertex computation begins. Since the
+ * Vertex object is serializable, variables in your Vertex program always
+ * should be declared a s static.
+ *
*/
public void setup(HamaConfiguration conf);
@@ -78,12 +81,14 @@ public interface VertexInterface<V exten
public void sendMessage(V destinationVertexID, M msg) throws IOException;
/**
- * Sends a message to add a new vertex through the partitioner to the
appropriate BSP peer
+ * Sends a message to add a new vertex through the partitioner to the
+ * appropriate BSP peer
*/
- public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws
IOException;
+ public void addVertex(V vertexID, List<Edge<V, E>> edges, M value)
+ throws IOException;
/**
- * Removes current Vertex from local peer.
+ * Removes current Vertex from local peer.
*/
public void remove() throws IOException;
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Fri
Jan 3 07:03:58 2014
@@ -19,9 +19,9 @@ package org.apache.hama.graph;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
/**
@@ -37,13 +37,13 @@ public interface VerticesInfo<V extends
/**
* Initialization of internal structures.
*/
- public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+ public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException;
/**
* Cleanup of internal structures.
*/
- public void cleanup(Configuration conf, TaskAttemptID attempt)
+ public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
throws IOException;
/**
@@ -92,5 +92,4 @@ public interface VerticesInfo<V extends
public int size();
public IDSkippingIterator<V, E, M> skippingIterator();
-
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
Fri Jan 3 07:03:58 2014
@@ -22,10 +22,10 @@ import java.util.List;
import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.example.PageRank;
import org.apache.hama.graph.example.PageRank.PageRankVertex;
@@ -36,7 +36,7 @@ public class TestDiskVerticesInfo extend
@Test
public void testDiskVerticesInfoLifeCycle() throws Exception {
DiskVerticesInfo<Text, NullWritable, DoubleWritable> info = new
DiskVerticesInfo<Text, NullWritable, DoubleWritable>();
- Configuration conf = new Configuration();
+ HamaConfiguration conf = new HamaConfiguration();
conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
NullWritable.class.getName());
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestOffHeapVerticesInfo.java
Fri Jan 3 07:03:58 2014
@@ -17,28 +17,28 @@
*/
package org.apache.hama.graph;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.example.PageRank.PageRankVertex;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
public class TestOffHeapVerticesInfo {
@Test
public void testOffHeapVerticesInfoLifeCycle() throws Exception {
OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> info = new
OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
- Configuration conf = new Configuration();
+ HamaConfiguration conf = new HamaConfiguration();
conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
NullWritable.class.getName());
@@ -121,7 +121,7 @@ public class TestOffHeapVerticesInfo {
public void testAdditionWithDefaults() throws Exception {
OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
- Configuration conf = new Configuration();
+ HamaConfiguration conf = new HamaConfiguration();
verticesInfo.init(null, conf, null);
Vertex<Text, NullWritable, DoubleWritable> vertex = new PageRankVertex();
vertex.setVertexID(new Text("some-id"));
@@ -133,7 +133,7 @@ public class TestOffHeapVerticesInfo {
public void testMassiveAdditionWithDefaults() throws Exception {
OffHeapVerticesInfo<Text, NullWritable, DoubleWritable> verticesInfo =
new OffHeapVerticesInfo<Text, NullWritable, DoubleWritable>();
- Configuration conf = new Configuration();
+ HamaConfiguration conf = new HamaConfiguration();
verticesInfo.init(null, conf, null);
assertEquals("vertices info size should be 0 at startup", 0,
verticesInfo.size());
Random r = new Random();
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java?rev=1555020&r1=1555019&r2=1555020&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Fri Jan 3 07:03:58 2014
@@ -35,9 +35,9 @@ import java.util.*;
*/
public class SemiClusteringVertex extends
Vertex<Text, DoubleWritable, SemiClusterMessage> {
- private int semiClusterMaximumVertexCount;
- private int graphJobMessageSentCount;
- private int graphJobVertexMaxClusterCount;
+ private static int semiClusterMaximumVertexCount;
+ private static int graphJobMessageSentCount;
+ private static int graphJobVertexMaxClusterCount;
@Override
public void setup(HamaConfiguration conf) {