Author: edwardyoon
Date: Fri Aug 16 12:03:59 2013
New Revision: 1514667
URL: http://svn.apache.org/r1514667
Log:
HAMA-767: Add vertex addition/removal APIs (Anastasis Andronidis via edwardyoon)
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
hama/trunk/examples/src/test/resources/dg.txt
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Aug 16 12:03:59 2013
@@ -4,6 +4,7 @@ Release 0.6.3 (unreleased changes)
NEW FEATURES
+ HAMA-767: Add vertex addition/removal APIs (Anastasis Andronidis via
edwardyoon)
HAMA-594: Semi-Clustering Algorithm Implementation (Renil Jeseph via
edwardyoon)
BUG FIXES
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1514667&view=auto
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
(added)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
Fri Aug 16 12:03:59 2013
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
+
+/**
+ * This is an example of how to manipulate Graphs dynamically.
+ * The input of this example is a number in each row. We assume
+ * that the is a vertex with ID:1 which is responsible to create
+ * a sum vertex that will aggregate the values of the other
+ * vertices. During the aggregation, sum vertex will delete all
+ * other vertices.
+ *
+ * Input example:
+ * 1
+ * 2
+ * 3
+ * 4
+ *
+ * Output example:
+ * sum 10
+ */
+public class DynamicGraph {
+
+ public static class GraphTextReader extends
+ VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
+
+ vertex.setVertexID(value);
+ vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
+
+ return true;
+ }
+ }
+
+ public static class GraphVertex extends
+ Vertex<Text, NullWritable, IntWritable> {
+
+ private void createSumVertex() throws IOException {
+ if (this.getVertexID().toString().equals("1")) {
+ Text new_id = new Text("sum");
+ this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new
IntWritable(0));
+ }
+ }
+
+ private void sendAllValuesToSumAndRemove() throws IOException {
+ if (!this.getVertexID().toString().equals("sum")) {
+ this.sendMessage(new Text("sum"), this.getValue());
+ this.remove();
+ }
+ }
+
+ // this must run only on "sum" vertex
+ private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
+ if (this.getVertexID().toString().equals("sum")) {
+ int s = 0;
+ for (IntWritable i : msgs) {
+ s += i.get();
+ }
+ s +=
this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
+ this.setValue(new IntWritable(this.getValue().get() +s));
+ } else {
+ throw new UnsupportedOperationException("We have more vertecies than
we expected: " + this.getVertexID() + " " + this.getValue());
+ }
+ }
+
+ @Override
+ public void compute(Iterable<IntWritable> msgs) throws IOException {
+ if (this.getSuperstepCount() == 0) {
+ createSumVertex();
+ } else if (this.getSuperstepCount() == 1) {
+ sendAllValuesToSumAndRemove();
+ } else if (this.getSuperstepCount() == 2) {
+ calculateSum(msgs);
+ } else if (this.getSuperstepCount() == 3) {
+ this.voteToHalt();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ if (args.length != 2) {
+ printUsage();
+ }
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ GraphJob graphJob = createJob(args, conf);
+ long startTime = System.currentTimeMillis();
+ if (graphJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: <input> <output>");
+ System.exit(-1);
+ }
+
+ private static GraphJob createJob(String[] args, HamaConfiguration conf)
throws IOException {
+ GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
+ graphJob.setJobName("Dynamic Graph");
+ graphJob.setVertexClass(GraphVertex.class);
+
+ graphJob.setInputPath(new Path(args[0]));
+ graphJob.setOutputPath(new Path(args[1]));
+
+ graphJob.setVertexIDClass(Text.class);
+ graphJob.setVertexValueClass(IntWritable.class);
+ graphJob.setEdgeValueClass(NullWritable.class);
+
+ graphJob.setInputFormat(TextInputFormat.class);
+
+ graphJob.setVertexInputReaderClass(GraphTextReader.class);
+ graphJob.setPartitioner(HashPartitioner.class);
+
+ graphJob.setOutputFormat(TextOutputFormat.class);
+ graphJob.setOutputKeyClass(Text.class);
+ graphJob.setOutputValueClass(IntWritable.class);
+
+ return graphJob;
+ }
+
+}
Added:
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java?rev=1514667&view=auto
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
(added)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
Fri Aug 16 12:03:59 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
+
+/**
+ * Testcase for {@link org.apache.hama.examples.DynamicGraph}
+ */
+public class DynamicGraphTest extends TestCase {
+ private static String OUTPUT = "/tmp/page-out";
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(OUTPUT)))
+ fs.delete(new Path(OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void verifyResult() throws IOException {
+ FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+ for (FileStatus fts : globStatus) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ fs.open(fts.getPath())));
+ String line = null;
+ while ((line = reader.readLine()) != null) {
+ String[] split = line.split("\t");
+ assertTrue(split[0].equals("sum"));
+ assertTrue(split[1].equals("11"));
+ System.out.println(split[0] + " : " + split[1]);
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException,
ClassNotFoundException {
+ try {
+ DynamicGraph.main(new String[] {"src/test/resources/dg.txt", OUTPUT });
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+}
Added: hama/trunk/examples/src/test/resources/dg.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/resources/dg.txt?rev=1514667&view=auto
==============================================================================
--- hama/trunk/examples/src/test/resources/dg.txt (added)
+++ hama/trunk/examples/src/test/resources/dg.txt Fri Aug 16 12:03:59 2013
@@ -0,0 +1,4 @@
+1
+2
+3
+4
\ No newline at end of file
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Fri Aug 16 12:03:59 2013
@@ -21,6 +21,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -90,14 +91,18 @@ public final class AggregationRunner<V e
/**
* Runs the aggregators by sending their values to the master task.
+ * @param changedVertexCnt
*/
public void sendAggregatorValues(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- int activeVertices) throws IOException {
+ int activeVertices, int changedVertexCnt) throws IOException {
// send msgCounts to the master task
MapWritable updatedCnt = new MapWritable();
updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
activeVertices));
+ // send total number of vertices changes
+ updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
+ changedVertexCnt));
// also send aggregated values to the master
if (aggregators != null) {
for (int i = 0; i < this.aggregators.length; i++) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
Fri Aug 16 12:03:59 2013
@@ -120,6 +120,11 @@ public final class DiskVerticesInfo<V ex
size++;
}
+ @Override
+ public void removeVertex(V vertexID) {
+ throw new UnsupportedOperationException ("Not yet implemented");
+ }
+
/**
* Serializes the vertex's soft parts to its file. If the vertex does not
have
* an index yet (e.G. at startup) you can provide -1 and it will be added to
@@ -169,6 +174,11 @@ public final class DiskVerticesInfo<V ex
lockedAdditions = true;
}
+ @Override
+ public void finishRemovals() {
+ throw new UnsupportedOperationException ("Not yet implemented");
+ }
+
private static long[] copy(ArrayList<Long> lst) {
long[] arr = new long[lst.size()];
for (int i = 0; i < arr.length; i++) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Fri Aug 16 12:03:59 2013
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -63,7 +64,15 @@ public final class GraphJobRunner<V exte
public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+ public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
+ public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
+ public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
+ public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
+ public static final Text FLAG_VERTEX_INCREASE = new
Text(S_FLAG_VERTEX_INCREASE);
+ public static final Text FLAG_VERTEX_DECREASE = new
Text(S_FLAG_VERTEX_DECREASE);
+ public static final Text FLAG_VERTEX_ALTER_COUNTER = new
Text(S_FLAG_VERTEX_ALTER_COUNTER);
+ public static final Text FLAG_VERTEX_TOTAL_VERTICES = new
Text(S_FLAG_VERTEX_TOTAL_VERTICES);
public static final String MESSAGE_COMBINER_CLASS_KEY =
"hama.vertex.message.combiner.class";
public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -81,6 +90,7 @@ public final class GraphJobRunner<V exte
private VerticesInfo<V, E, M> vertices;
private boolean updated = true;
private int globalUpdateCounts = 0;
+ private int changedVertexCnt = 0;
private long numberVertices = 0;
// -1 is deactivated
@@ -165,9 +175,20 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
+ if (isMasterTask(peer) && iteration == 1) {
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new
LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
+ // send the updates from the master tasks back to the slaves
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new GraphJobMessage(updatedCnt));
+ }
+ }
+
// this is only done in every second iteration
if (isMasterTask(peer) && iteration > 1) {
MapWritable updatedCnt = new MapWritable();
+ // send total number of vertices.
+ updatedCnt.put(FLAG_VERTEX_TOTAL_VERTICES, new
LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES).getCounter())));
// exit if there's no update made
if (globalUpdateCounts == 0) {
updatedCnt.put(FLAG_MESSAGE_COUNTS, new
IntWritable(Integer.MIN_VALUE));
@@ -210,6 +231,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
int activeVertices = 0;
+ this.changedVertexCnt = 0;
vertices.startSuperstep();
/*
* We iterate over our messages and vertices in sorted order. That means
@@ -255,7 +277,7 @@ public final class GraphJobRunner<V exte
}
vertices.finishSuperstep();
- aggregationRunner.sendAggregatorValues(peer, activeVertices);
+ aggregationRunner.sendAggregatorValues(peer, activeVertices,
this.changedVertexCnt);
iteration++;
}
@@ -310,6 +332,7 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
vertices.startSuperstep();
+ this.changedVertexCnt = 0;
IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
while (skippingIterator.hasNext()) {
Vertex<V, E, M> vertex = skippingIterator.next();
@@ -319,7 +342,7 @@ public final class GraphJobRunner<V exte
vertices.finishVertexComputation(vertex);
}
vertices.finishSuperstep();
- aggregationRunner.sendAggregatorValues(peer, 1);
+ aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt);
iteration++;
}
@@ -426,6 +449,51 @@ public final class GraphJobRunner<V exte
}
/**
+ * Add new vertex into memory of each peer.
+ * @throws IOException
+ */
+ private void addVertex(Vertex<V, E, M> vertex) throws IOException {
+ vertex.runner = this;
+ vertex.setup(conf);
+
+ if (conf.getBoolean("hama.graph.self.ref", false)) {
+ vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
+ }
+
+ LOG.debug("Added VertexID: " + vertex.getVertexID() + " in peer " +
peer.getPeerName());
+ vertices.addVertex(vertex);
+ }
+
+ /**
+ * Remove vertex from this peer.
+ * @throws IOException
+ */
+ private void removeVertex(V vertexID) {
+ vertices.removeVertex(vertexID);
+ LOG.debug("Removed VertexID: " + vertexID + " in peer " +
peer.getPeerName());
+ }
+
+ /**
+ * After all inserts are done, we must finalize the VertexInfo data
structure.
+ * @throws IOException
+ */
+ private void finishAdditions() throws IOException {
+ vertices.finishAdditions();
+ // finish the "superstep" because we have written a new file here
+ vertices.finishSuperstep();
+ }
+
+ /**
+ * After all inserts are done, we must finalize the VertexInfo data
structure.
+ * @throws IOException
+ */
+ private void finishRemovals() throws IOException {
+ vertices.finishRemovals();
+ // finish the "superstep" because we have written a new file here
+ vertices.finishSuperstep();
+ }
+
+ /**
* Counts vertices globally by sending the count of vertices in the map to
the
* other peers.
*/
@@ -461,6 +529,9 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
GraphJobMessage msg = null;
+ boolean dynamicAdditions = false;
+ boolean dynamicRemovals = false;
+
while ((msg = peer.getCurrentMessage()) != null) {
// either this is a vertex message or a directive that must be read
// as map
@@ -486,6 +557,20 @@ public final class GraphJobRunner<V exte
&& vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
aggregationRunner.masterReadAggregatedIncrementalValue(vertexID,
(M) e.getValue());
+ } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
+ dynamicAdditions = true;
+ addVertex((Vertex<V, E, M>) e.getValue());
+ } else if (FLAG_VERTEX_DECREASE.equals(vertexID)) {
+ dynamicRemovals = true;
+ removeVertex((V) e.getValue());
+ } else if (FLAG_VERTEX_TOTAL_VERTICES.equals(vertexID)) {
+ this.numberVertices = ((LongWritable) e.getValue()).get();
+ } else if (FLAG_VERTEX_ALTER_COUNTER.equals(vertexID)) {
+ if (isMasterTask(peer)) {
+
peer.getCounter(GraphJobCounter.INPUT_VERTICES).increment(((LongWritable)
e.getValue()).get());
+ } else {
+ throw new UnsupportedOperationException("A message to increase
vertex count is in a wrong place: " + peer);
+ }
}
}
@@ -494,6 +579,15 @@ public final class GraphJobRunner<V exte
}
}
+
+ // If we applied any changes to vertices, we need to call finishAdditions
and finishRemovals in the end.
+ if (dynamicAdditions) {
+ finishAdditions();
+ }
+ if (dynamicRemovals) {
+ finishRemovals();
+ }
+
return msg;
}
@@ -605,4 +699,12 @@ public final class GraphJobRunner<V exte
return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS);
}
+ public int getChangedVertexCnt() {
+ return changedVertexCnt;
+ }
+
+ public void setChangedVertexCnt(int changedVertexCnt) {
+ this.changedVertexCnt = changedVertexCnt;
+ }
+
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
Fri Aug 16 12:03:59 2013
@@ -18,9 +18,9 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
@@ -37,12 +37,24 @@ import org.apache.hama.bsp.TaskAttemptID
public final class ListVerticesInfo<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
- private final List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E,
M>>(
- 100);
+ private final SortedSet<Vertex<V, E, M>> vertices = new TreeSet<Vertex<V, E,
M>>();
+ // We will use this variable to make vertex removals, so we don't invoke GC
too many times.
+ private final Vertex<V, E, M> vertexTemplate = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
@Override
public void addVertex(Vertex<V, E, M> vertex) {
- vertices.add(vertex);
+ if (!vertices.add(vertex)) {
+ throw new UnsupportedOperationException("Vertex with ID: " +
vertex.getVertexID() + " already exists!");
+ }
+ }
+
+ @Override
+ public void removeVertex(V vertexID) throws UnsupportedOperationException {
+ vertexTemplate.setVertexID(vertexID);
+
+ if (!vertices.remove(vertexTemplate)) {
+ throw new UnsupportedOperationException("Vertex with ID: " + vertexID +
" not found on this peer.");
+ }
}
public void clear() {
@@ -57,26 +69,40 @@ public final class ListVerticesInfo<V ex
@Override
public IDSkippingIterator<V, E, M> skippingIterator() {
return new IDSkippingIterator<V, E, M>() {
- int currentIndex = 0;
+ Iterator<Vertex<V, E, M>> it = vertices.iterator();
+ Vertex<V, E, M> v;
@Override
- public boolean hasNext(V e,
+ public boolean hasNext(V msgId,
org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
- if (currentIndex < vertices.size()) {
- while (!strat.accept(vertices.get(currentIndex), e)) {
- currentIndex++;
+ if (it.hasNext()) {
+ v = it.next();
+
+ while (!strat.accept(v, msgId)) {
+ if (it.hasNext()) {
+ v = it.next();
+ } else {
+ return false;
+ }
}
return true;
} else {
+ v = null;
return false;
}
}
@Override
public Vertex<V, E, M> next() {
- return vertices.get(currentIndex++);
+ if (v == null) {
+ throw new UnsupportedOperationException("You must invoke hasNext
before ask for the next vertex.");
+ }
+
+ Vertex<V, E, M> tmp = v;
+ v = null;
+ return tmp;
}
};
@@ -89,7 +115,11 @@ public final class ListVerticesInfo<V ex
@Override
public void finishAdditions() {
- Collections.sort(vertices);
+
+ }
+
+ @Override
+ public void finishRemovals() {
}
@Override
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Aug 16
12:03:59 2013
@@ -25,6 +25,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.bsp.BSPPeer;
@@ -112,6 +114,42 @@ public abstract class Vertex<V extends W
new GraphJobMessage(destinationVertexID, msg));
}
+ private void alterVertexCounter(int i) throws IOException {
+ this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
+ }
+
+ @Override
+ public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws
IOException {
+ MapWritable msg = new MapWritable();
+ // Create the new vertex.
+ Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ vertex.setEdges(edges);
+ vertex.setValue(value);
+ vertex.setVertexID(vertexID);
+
+ msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
+ // Find the proper partition to host the new vertex.
+ int partition = getPartitioner().getPartition(vertexID, value,
+ runner.getPeer().getNumPeers());
+ String destPeer = runner.getPeer().getAllPeerNames()[partition];
+
+ runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+
+ alterVertexCounter(1);
+ }
+
+ @Override
+ public void remove() throws IOException {
+ MapWritable msg = new MapWritable();
+ msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID);
+
+ // Get master task peer.
+ String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
+ runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+
+ alterVertexCounter(-1);
+ }
+
@Override
public long getSuperstepCount() {
return runner.getNumberIterations();
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Fri Aug 16 12:03:59 2013
@@ -78,6 +78,16 @@ public interface VertexInterface<V exten
public void sendMessage(V destinationVertexID, M msg) throws IOException;
/**
+ * Sends a message to add a new vertex through the partitioner to the
appropriate BSP peer
+ */
+ public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws
IOException;
+
+ /**
+ * Removes current Vertex from local peer.
+ */
+ public void remove() throws IOException;
+
+ /**
* @return the superstep number of the current superstep (starting from 0).
*/
public long getSuperstepCount();
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1514667&r1=1514666&r2=1514667&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Fri
Aug 16 12:03:59 2013
@@ -52,12 +52,23 @@ public interface VerticesInfo<V extends
public void addVertex(Vertex<V, E, M> vertex) throws IOException;
/**
+ * Remove a vertex to the underlying structure.
+ */
+ public void removeVertex(V vertexID) throws UnsupportedOperationException;
+
+ /**
* Finish the additions, from this point on the implementations should close
* the adds and throw exceptions in case something is added after this call.
*/
public void finishAdditions();
/**
+ * Finish the removals, from this point on the implementations should close
+ * the removes and throw exceptions in case something is removed after this
call.
+ */
+ public void finishRemovals();
+
+ /**
* Called once a superstep starts.
*/
public void startSuperstep() throws IOException;