Author: edwardyoon
Date: Thu Jan 9 01:10:59 2014
New Revision: 1556691
URL: http://svn.apache.org/r1556691
Log:
HAMA-838: Refactor aggregators
Added:
hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Jan 9 01:10:59 2014
@@ -20,6 +20,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-838: Refactor aggregators (Anastasis Andronidis)
HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon)
HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes
(Martin Illecker)
HAMA-808: Hama Pipes Testcase (Martin Illecker)
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Thu Jan 9 01:10:59 2014
@@ -40,6 +40,7 @@ import org.apache.hama.graph.VertexInput
* Real pagerank with dangling node contribution.
*/
public class PageRank {
+ private static final String AVG_AGGREGATOR = "average.aggregator";
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
@@ -63,29 +64,29 @@ public class PageRank {
public void compute(Iterable<DoubleWritable> messages) throws IOException {
// initialize this vertex to 1 / count of global vertices in this graph
if (this.getSuperstepCount() == 0) {
- this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
+ setValue(new DoubleWritable(1.0 / this.getNumVertices()));
} else if (this.getSuperstepCount() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
- this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+ setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+ aggregate(AVG_AGGREGATOR, this.getValue());
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = getLastAggregatedValue(0);
+ DoubleWritable globalError = (DoubleWritable)
getAggregatedValue(AVG_AGGREGATOR);
+
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
voteToHalt();
- return;
+ } else {
+ // in each superstep we are going to send a new rank to our neighbours
+ sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
+ / this.getEdges().size()));
}
-
- // in each superstep we are going to send a new rank to our neighbours
- sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
- / this.getEdges().size()));
}
-
}
public static class PagerankSeqReader
@@ -126,7 +127,7 @@ public class PageRank {
}
// error
- pageJob.setAggregatorClass(AverageAggregator.class);
+ pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class);
// Vertex reader
pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
Added:
hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java?rev=1556691&view=auto
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
(added)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
Thu Jan 9 01:10:59 2014
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.SumAggregator;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+import org.junit.Test;
+
+/**
+ * Unit test for aggregators
+ */
+public class AggregatorsTest extends TestCase {
+ private static String OUTPUT = "/tmp/page-out";
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(OUTPUT)))
+ fs.delete(new Path(OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void verifyResult() throws IOException {
+ FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+ for (FileStatus fts : globStatus) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ fs.open(fts.getPath())));
+ String line = null;
+
+ String[] results = { "6.0", "2.0", "3.0", "4.0" };
+
+ for (int i = 1; i < 5; i++) {
+ line = reader.readLine();
+ String[] split = line.split("\t");
+ assertTrue(split[0].equals(String.valueOf(i)));
+ assertTrue(split[1].equals(results[i - 1]));
+ System.out.println(split[0] + " : " + split[1]);
+ }
+ }
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ try {
+ CustomAggregators
+ .main(new String[] { "src/test/resources/dg.txt", OUTPUT });
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+ static class CustomAggregators {
+
+ public static class GraphTextReader
+ extends
+ VertexInputReader<LongWritable, Text, Text, NullWritable,
DoubleWritable> {
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, NullWritable, DoubleWritable> vertex) throws Exception {
+
+ vertex.setVertexID(value);
+ vertex
+ .setValue(new
DoubleWritable(Double.parseDouble(value.toString())));
+
+ return true;
+ }
+ }
+
+ public static class GraphVertex extends
+ Vertex<Text, NullWritable, DoubleWritable> {
+
+ @Override
+ public void compute(Iterable<DoubleWritable> msgs) throws IOException {
+
+ // We will send 2 custom messages on superstep 2 and 4 only!
+ if (this.getSuperstepCount() == 2) {
+ this.aggregate("mySum", new DoubleWritable(1.0));
+ }
+
+ if (this.getSuperstepCount() == 4) {
+ this.aggregate("mySum", new DoubleWritable(2.0));
+ }
+
+ // We will get the first aggrigation result from our custom aggregator
+ // on superstep 3,
+ // and we will store the result only in vertex 4.
+ // This vertex should have value = 4.
+ if (this.getSuperstepCount() == 3
+ && this.getVertexID().toString().equals("4")) {
+ this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
+ }
+
+ // By setting vertex number 3 to halt, we will see a change on the
+ // aggregating results
+ // in both custom and global aggregators.
+ // This vertex should have value = 3.
+ if (this.getSuperstepCount() == 3
+ && this.getVertexID().toString().equals("3")) {
+ this.voteToHalt();
+ }
+
+ // This vertex should have value = 6 (3 vertices are working x 2 the
+ // custom value)
+ if (this.getSuperstepCount() == 5
+ && this.getVertexID().toString().equals("1")) {
+ this.setValue((DoubleWritable) this.getAggregatedValue("mySum"));
+ }
+
+ if (this.getSuperstepCount() == 6) {
+ this.voteToHalt();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ if (args.length != 2) {
+ printUsage();
+ }
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ GraphJob graphJob = createJob(args, conf);
+ long startTime = System.currentTimeMillis();
+ if (graphJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: <input> <output>");
+ System.exit(-1);
+ }
+
+ private static GraphJob createJob(String[] args, HamaConfiguration conf)
+ throws IOException {
+ GraphJob graphJob = new GraphJob(conf, CustomAggregators.class);
+ graphJob.setJobName("Custom Aggregators");
+ graphJob.setVertexClass(GraphVertex.class);
+
+ graphJob.registerAggregator("mySum", SumAggregator.class);
+
+ graphJob.setInputPath(new Path(args[0]));
+ graphJob.setOutputPath(new Path(args[1]));
+
+ graphJob.setVertexIDClass(Text.class);
+ graphJob.setVertexValueClass(DoubleWritable.class);
+ graphJob.setEdgeValueClass(NullWritable.class);
+
+ graphJob.setInputFormat(TextInputFormat.class);
+
+ graphJob.setVertexInputReaderClass(GraphTextReader.class);
+ graphJob.setPartitioner(HashPartitioner.class);
+
+ graphJob.setOutputFormat(TextOutputFormat.class);
+ graphJob.setOutputKeyClass(Text.class);
+ graphJob.setOutputValueClass(DoubleWritable.class);
+
+ return graphJob;
+ }
+ }
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Thu Jan 9 01:10:59 2014
@@ -17,22 +17,20 @@
*/
package org.apache.hama.graph;
-import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.sync.SyncException;
-
-import com.google.common.base.Preconditions;
/**
* Runner class to do the tasks that need to be done if aggregation was
@@ -41,117 +39,31 @@ import com.google.common.base.Preconditi
*/
@SuppressWarnings("rawtypes")
public final class AggregationRunner<V extends WritableComparable, E extends
Writable, M extends Writable> {
+ private Map<String, Aggregator> Aggregators;
+ private Map<String, Writable> aggregatorResults;
+ private Set<String> aggregatorsUsed;
- // multiple aggregator arrays
- private Aggregator<M, Vertex<V, E, M>>[] aggregators;
- private Set<Integer> skipAggregators;
- private Writable[] globalAggregatorResult;
- private IntWritable[] globalAggregatorIncrement;
- private boolean[] isAbstractAggregator;
- private String[] aggregatorClassNames;
- private Text[] aggregatorValueFlag;
- private Text[] aggregatorIncrementFlag;
- // aggregator on the master side
- private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
-
- private boolean enabled = false;
private Configuration conf;
+ private Text textWrap = new Text();
- @SuppressWarnings("unchecked")
public void setupAggregators(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
this.conf = peer.getConfiguration();
- String aggregatorClasses = peer.getConfiguration().get(
+
+ this.aggregatorResults = new HashMap<String, Writable>(4);
+ this.Aggregators = new HashMap<String, Aggregator>(4);
+ this.aggregatorsUsed = new HashSet<String>(4);
+
+ String customAggregatorClasses = peer.getConfiguration().get(
GraphJob.AGGREGATOR_CLASS_ATTR);
- this.skipAggregators = new HashSet<Integer>();
- if (aggregatorClasses != null) {
- enabled = true;
- aggregatorClassNames = aggregatorClasses.split(";");
- // init to the split size
- aggregators = new Aggregator[aggregatorClassNames.length];
- globalAggregatorResult = new Writable[aggregatorClassNames.length];
- globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
- isAbstractAggregator = new boolean[aggregatorClassNames.length];
- aggregatorValueFlag = new Text[aggregatorClassNames.length];
- aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
- if (GraphJobRunner.isMasterTask(peer)) {
- masterAggregator = new Aggregator[aggregatorClassNames.length];
- }
- for (int i = 0; i < aggregatorClassNames.length; i++) {
- aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
- aggregatorValueFlag[i] = new Text(
- GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
- aggregatorIncrementFlag[i] = new Text(
- GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
- if (aggregators[i] instanceof AbstractAggregator) {
- isAbstractAggregator[i] = true;
- }
- if (GraphJobRunner.isMasterTask(peer)) {
- masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
- }
- }
- }
- }
- /**
- * Runs the aggregators by sending their values to the master task.
- * @param changedVertexCnt
- */
- public void sendAggregatorValues(
- BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
- int activeVertices, int changedVertexCnt) throws IOException {
- // send msgCounts to the master task
- MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
- activeVertices));
- // send total number of vertices changes
- updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
- changedVertexCnt));
- // also send aggregated values to the master
- if (aggregators != null) {
- for (int i = 0; i < this.aggregators.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
- if (isAbstractAggregator[i]) {
- updatedCnt.put(aggregatorIncrementFlag[i],
- ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
- .getTimesAggregated());
- }
- }
- }
- for (int i = 0; i < aggregators.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- // now create new aggregators for the next iteration
- aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
- if (GraphJobRunner.isMasterTask(peer)) {
- masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
- }
- }
- }
- }
- peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
- updatedCnt));
- }
+ if (customAggregatorClasses != null) {
+ String[] custAggrs = customAggregatorClasses.split(";");
- /**
- * Aggregates the last value before computation and the value after the
- * computation.
- *
- * @param lastValue the value before compute().
- * @param v the vertex.
- */
- public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
- if (isEnabled()) {
- for (int i = 0; i < this.aggregators.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
- aggregator.aggregate(v, v.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
- intern.aggregate(v, lastValue, v.getValue());
- intern.aggregateInternal();
- }
- }
+ for (String aggr : custAggrs) {
+ String[] Name_AggrClass = aggr.split("@", 2);
+ this.Aggregators.put(Name_AggrClass[0],
+ getNewAggregator(Name_AggrClass[1]));
}
}
}
@@ -161,26 +73,20 @@ public final class AggregationRunner<V e
* peer and updates the given map accordingly.
*/
public void doMasterAggregation(MapWritable updatedCnt) {
- if (isEnabled()) {
- // work through the master aggregators
- for (int i = 0; i < masterAggregator.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- Writable lastAggregatedValue = masterAggregator[i].getValue();
- if (isAbstractAggregator[i]) {
- final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
- final Writable finalizeAggregation = intern.finalizeAggregation();
- if (intern.finalizeAggregation() != null) {
- lastAggregatedValue = finalizeAggregation;
- }
- // this count is usually the times of active
- // vertices in the graph
- updatedCnt.put(aggregatorIncrementFlag[i],
- intern.getTimesAggregated());
- }
- updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
- }
- }
+ // Get results only from used aggregators.
+ for (String name : this.aggregatorsUsed) {
+ updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue());
+ }
+ this.aggregatorsUsed.clear();
+
+ // Reset all custom aggregators. TODO: Change the aggregation interface to
+ // include clean() method.
+ Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4);
+ for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) {
+ String aggClass = e.getValue().getClass().getName();
+ tmp.put(e.getKey(), getNewAggregator(aggClass));
}
+ this.Aggregators = tmp;
}
/**
@@ -190,13 +96,20 @@ public final class AggregationRunner<V e
* we haven't seen any messages anymore.
*/
public boolean receiveAggregatedValues(MapWritable updatedValues,
- long iteration) throws IOException, SyncException, InterruptedException {
- // map is the first value that is in the queue
- for (int i = 0; i < aggregators.length; i++) {
- globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
- globalAggregatorIncrement[i] = (IntWritable) updatedValues
- .get(aggregatorIncrementFlag[i]);
+ long iteration) {
+ // In every superstep, we create a new result collection as we don't save
+ // history.
+ // If a value is missing, the user will take a null result. By creating a
+ // new collection
+ // every time, we can reduce the network cost (because we send less
+ // information by skipping null values)
+ // But we are losing in GC.
+ this.aggregatorResults = new HashMap<String, Writable>(4);
+ for (String name : this.Aggregators.keySet()) {
+ this.textWrap.set(name);
+ this.aggregatorResults.put(name, updatedValues.get(textWrap));
}
+
IntWritable count = (IntWritable) updatedValues
.get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
if (count != null && count.get() == Integer.MIN_VALUE) {
@@ -206,47 +119,16 @@ public final class AggregationRunner<V e
}
/**
- * @return true if aggregators were defined. Normally used by the internal
- * stateful methods, outside shouldn't use it too extensively.
+ * Method to let the custom master aggregator read messages from peers and
+ * aggregate a value.
*/
- public boolean isEnabled() {
- return enabled;
- }
-
- /**
- * Method to let the master read messages from peers and aggregate a value.
- */
- public void masterReadAggregatedValue(Text textIndex, M value) {
- int index = Integer.parseInt(textIndex.toString().split(";")[1]);
- masterAggregator[index].aggregate(null, value);
- }
-
- /**
- * Method to let the master read messages from peers and aggregate the
- * incremental value.
- */
- public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
- int index = Integer.parseInt(textIndex.toString().split(";")[1]);
- if (isAbstractAggregator[index]) {
- ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
- .addTimesAggregated(((IntWritable) value).get());
- }
- }
+ @SuppressWarnings("unchecked")
+ public void masterAggregation(Text name, Writable value) {
+ String nameIdx = name.toString().split(";", 2)[1];
+ this.Aggregators.get(nameIdx).aggregate(null, value);
- /**
- * This method adds an id of an aggregator that will be skipped in the
current
- * superstep.
- */
- public void addSkipAggregator(int index) {
- this.skipAggregators.add(index);
- }
-
- /**
- * This method adds an id of an aggregator that will be skipped in the
current
- * superstep.
- */
- void resetSkipAggregators() {
- this.skipAggregators.clear();
+ // When it's time to send the values, we can see which aggregators are
used.
+ this.aggregatorsUsed.add(nameIdx);
}
@SuppressWarnings("unchecked")
@@ -261,13 +143,7 @@ public final class AggregationRunner<V e
+ " could not be found or instantiated!");
}
- public final Writable getLastAggregatedValue(int index) {
- return globalAggregatorResult[Preconditions.checkPositionIndex(index,
- globalAggregatorResult.length)];
- }
-
- public final IntWritable getNumLastAggregatedVertices(int index) {
- return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
- globalAggregatorIncrement.length)];
+ public final Writable getAggregatedValue(String name) {
+ return this.aggregatorResults.get(name);
}
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan
9 01:10:59 2014
@@ -102,23 +102,20 @@ public class GraphJob extends BSPJob {
}
/**
- * Set the aggregator for the job.
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public void setAggregatorClass(Class<? extends Aggregator> cls) {
- this.setAggregatorClass(new Class[] { cls });
- }
-
- /**
- * Sets multiple aggregators for the job.
- */
+ * Custom aggregator registration. Add a custom aggregator
+ * that will aggregate massages sent from the user.
+ *
+ * @param name identifies an aggregator
+ * @param aggregatorClass the aggregator class
+ */
@SuppressWarnings("rawtypes")
- public void setAggregatorClass(Class<? extends Aggregator>... cls) {
- String classNames = "";
- for (Class<? extends Aggregator> cl : cls) {
- classNames += cl.getName() + ";";
- }
- conf.set(AGGREGATOR_CLASS_ATTR, classNames);
+ public void registerAggregator(String name, Class<? extends
+ Aggregator> aggregatorClass) {
+ String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
+
+ prevAggrs += name + "@" + aggregatorClass.getName() + ";";
+
+ this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs);
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Thu Jan 9 01:10:59 2014
@@ -92,7 +92,6 @@ public final class GraphJobMessage imple
} else {
vertexId.write(out);
}
-
}
public void fastReadFields(DataInput in) throws IOException {
@@ -217,6 +216,7 @@ public final class GraphJobMessage imple
buffer = new DataInputBuffer();
}
+ @Override
public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2,
int s2, int l2) {
try {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Jan 9 01:10:59 2014
@@ -64,12 +64,11 @@ public final class GraphJobRunner<V exte
// make sure that these values don't collide with the vertex names
public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
- public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
- public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
- public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
- public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
- public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
- public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
+ public static final String S_FLAG_VERTEX_INCREASE = "hama.2";
+ public static final String S_FLAG_VERTEX_DECREASE = "hama.3";
+ public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4";
+ public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5";
+
public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
public static final Text FLAG_VERTEX_INCREASE = new Text(
S_FLAG_VERTEX_INCREASE);
@@ -79,8 +78,6 @@ public final class GraphJobRunner<V exte
S_FLAG_VERTEX_ALTER_COUNTER);
public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
S_FLAG_VERTEX_TOTAL_VERTICES);
- public static final Text FLAG_AGGREGATOR_SKIP = new Text(
- S_FLAG_AGGREGATOR_SKIP);
public static final String MESSAGE_COMBINER_CLASS_KEY =
"hama.vertex.message.combiner.class";
public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -183,22 +180,10 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- if (isMasterTask(peer) && iteration == 1) {
- MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(
- FLAG_VERTEX_TOTAL_VERTICES,
- new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
- .getCounter())));
- // send the updates from the master tasks back to the slaves
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new GraphJobMessage(updatedCnt));
- }
- }
-
- // this is only done in every second iteration
- if (isMasterTask(peer) && iteration > 1) {
+ // This run only on master
+ if (isMasterTask(peer)) {
MapWritable updatedCnt = new MapWritable();
- // send total number of vertices.
+ // send total number of vertices
updatedCnt.put(
FLAG_VERTEX_TOTAL_VERTICES,
new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
@@ -214,26 +199,26 @@ public final class GraphJobRunner<V exte
peer.send(peerName, new GraphJobMessage(updatedCnt));
}
}
- if (getAggregationRunner().isEnabled() && iteration > 1) {
- // in case we need to sync, we need to replay the messages that already
- // are added to the queue. This prevents loosing messages when using
- // aggregators.
- if (firstVertexMessage != null) {
- peer.send(peer.getPeerName(), firstVertexMessage);
- }
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- peer.send(peer.getPeerName(), msg);
- }
- // now sync
- peer.sync();
- // now the map message must be read that might be send from the master
- updated = getAggregationRunner().receiveAggregatedValues(
- peer.getCurrentMessage().getMap(), iteration);
- // set the first vertex message back to the message it had before sync
- firstVertexMessage = peer.getCurrentMessage();
+
+ // in case we need to sync, we need to replay the messages that already
+ // are added to the queue. This prevents loosing messages when using
+ // aggregators.
+ if (firstVertexMessage != null) {
+ peer.send(peer.getPeerName(), firstVertexMessage);
}
- this.aggregationRunner.resetSkipAggregators();
+
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ peer.send(peer.getPeerName(), msg);
+ }
+
+ // now sync
+ peer.sync();
+ // now the map message must be read that might be send from the master
+ updated = getAggregationRunner().receiveAggregatedValues(
+ peer.getCurrentMessage().getMap(), iteration);
+ // set the first vertex message back to the message it had before sync
+ firstVertexMessage = peer.getCurrentMessage();
return firstVertexMessage;
}
@@ -274,7 +259,6 @@ public final class GraphJobRunner<V exte
}
if (!vertex.isHalted()) {
- M lastValue = vertex.getValue();
if (iterable == null) {
vertex.compute(Collections.<M> emptyList());
} else {
@@ -286,7 +270,6 @@ public final class GraphJobRunner<V exte
}
currentMessage = iterable.getOverflowMessage();
}
- getAggregationRunner().aggregateVertex(lastValue, vertex);
activeVertices++;
}
@@ -296,8 +279,7 @@ public final class GraphJobRunner<V exte
}
vertices.finishSuperstep();
- getAggregationRunner().sendAggregatorValues(peer, activeVertices,
- this.changedVertexCnt);
+ sendControllValues(activeVertices, this.changedVertexCnt);
iteration++;
}
@@ -357,15 +339,13 @@ public final class GraphJobRunner<V exte
while (skippingIterator.hasNext()) {
Vertex<V, E, M> vertex = skippingIterator.next();
- M lastValue = vertex.getValue();
// Calls setup method.
vertex.setup(conf);
vertex.compute(Collections.singleton(vertex.getValue()));
- getAggregationRunner().aggregateVertex(lastValue, vertex);
vertices.finishVertexComputation(vertex);
}
vertices.finishSuperstep();
- getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
+ sendControllValues(1, this.changedVertexCnt);
iteration++;
}
@@ -594,14 +574,10 @@ public final class GraphJobRunner<V exte
} else {
globalUpdateCounts += ((IntWritable) e.getValue()).get();
}
- } else if (getAggregationRunner().isEnabled()
- && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
- getAggregationRunner().masterReadAggregatedValue(vertexID,
- (M) e.getValue());
- } else if (getAggregationRunner().isEnabled()
- && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
- getAggregationRunner().masterReadAggregatedIncrementalValue(
- vertexID, (M) e.getValue());
+
+ } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+ this.getAggregationRunner().masterAggregation(vertexID,
+ e.getValue());
} else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
dynamicAdditions = true;
addVertex((Vertex<V, E, M>) e.getValue());
@@ -619,21 +595,11 @@ public final class GraphJobRunner<V exte
"A message to increase vertex count is in a wrong place: "
+ peer);
}
- } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
- if (isMasterTask(peer)) {
- this.getAggregationRunner().addSkipAggregator(
- ((IntWritable) e.getValue()).get());
- } else {
- throw new UnsupportedOperationException(
- "A message to skip aggregators is in a wrong peer: " + peer);
- }
}
}
-
} else {
throw new UnsupportedOperationException("Unknown message type: " +
msg);
}
-
}
// If we applied any changes to vertices, we need to call finishAdditions
@@ -677,23 +643,20 @@ public final class GraphJobRunner<V exte
}
/**
- * Gets the last aggregated value at the given index. The index is dependend
- * on how the aggregators were configured during job setup phase.
- *
- * @return the value of the aggregator, or null if none was defined.
- */
- public final Writable getLastAggregatedValue(int index) {
- return getAggregationRunner().getLastAggregatedValue(index);
- }
-
- /**
- * Gets the last aggregated number of vertices at the given index. The index
- * is dependend on how the aggregators were configured during job setup
phase.
+ * Runs internal aggregators and send their values to the master task.
*
- * @return the value of the aggregator, or null if none was defined.
+ * @param activeVertices number of active vertices in this peer
+ * @param changedVertexCnt number of added/removed vertices in a superstep
*/
- public final IntWritable getNumLastAggregatedVertices(int index) {
- return getAggregationRunner().getNumLastAggregatedVertices(index);
+ private void sendControllValues(int activeVertices, int changedVertexCnt)
+ throws IOException {
+ // send msgCounts to the master task
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
+ // send total number of vertices changes
+ updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER,
+ new LongWritable(changedVertexCnt));
+ peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt));
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
Thu Jan 9 01:10:59 2014
@@ -82,6 +82,7 @@ public class OffHeapVerticesInfo<V exten
vertices.dump();
}
+ @Override
public void addVertex(Vertex<V, E, M> vertex) {
vertices.put(vertex.getVertexID(), vertex);
}
@@ -108,6 +109,7 @@ public class OffHeapVerticesInfo<V exten
vertices.clear();
}
+ @Override
public int size() {
return (int) this.vertices.entries();
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan 9
01:10:59 2014
@@ -27,8 +27,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
@@ -75,7 +75,7 @@ public abstract class Vertex<V extends W
@Override
public void setup(HamaConfiguration conf) {
}
-
+
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
runner.getPeer().send(getDestinationPeerName(e),
@@ -120,24 +120,26 @@ public abstract class Vertex<V extends W
private void alterVertexCounter(int i) throws IOException {
this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i);
}
-
+
@Override
- public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws
IOException {
+ public void addVertex(V vertexID, List<Edge<V, E>> edges, M value)
+ throws IOException {
MapWritable msg = new MapWritable();
// Create the new vertex.
- Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
vertex.setEdges(edges);
vertex.setValue(value);
vertex.setVertexID(vertexID);
-
+
msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
// Find the proper partition to host the new vertex.
- int partition = getPartitioner().getPartition(vertexID, value,
+ int partition = getPartitioner().getPartition(vertexID, value,
runner.getPeer().getNumPeers());
String destPeer = runner.getPeer().getAllPeerNames()[partition];
-
+
runner.getPeer().send(destPeer, new GraphJobMessage(msg));
-
+
alterVertexCounter(1);
}
@@ -145,11 +147,11 @@ public abstract class Vertex<V extends W
public void remove() throws IOException {
MapWritable msg = new MapWritable();
msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID);
-
+
// Get master task peer.
String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
runner.getPeer().send(destPeer, new GraphJobMessage(msg));
-
+
alterVertexCounter(-1);
}
@@ -192,31 +194,6 @@ public abstract class Vertex<V extends W
return runner.getMaxIteration();
}
- /**
- * Get the last aggregated value of the defined aggregator, null if nothing
- * was configured or not returned a result. You have to supply an index, the
- * index is defined by the order you set the aggregator classes in
- * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
- * so if you have a single aggregator you can retrieve it via
- * {@link #getLastAggregatedValue}(0).
- */
- @SuppressWarnings("unchecked")
- public M getLastAggregatedValue(int index) {
- return (M) runner.getLastAggregatedValue(index);
- }
-
- /**
- * Get the number of aggregated vertices in the last superstep. Or null if no
- * aggregator is available.You have to supply an index, the index is defined
- * by the order you set the aggregator classes in
- * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
- * so if you have a single aggregator you can retrieve it via
- * {@link #getNumLastAggregatedVertices}(0).
- */
- public IntWritable getNumLastAggregatedVertices(int index) {
- return runner.getNumLastAggregatedVertices(index);
- }
-
public int getNumPeers() {
return runner.getPeer().getNumPeers();
}
@@ -245,21 +222,6 @@ public abstract class Vertex<V extends W
this.votedToHalt = true;
}
- /**
- * Disable an aggregator for the next superstep. The returning value of
- * the aggregator will be null.
- */
- public void skipAggregator(int index) throws IOException {
- MapWritable msg = new MapWritable();
- msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index));
-
- this.runner.getAggregationRunner().addSkipAggregator(index);
-
- // Get master task peer.
- String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
- runner.getPeer().send(destPeer, new GraphJobMessage(msg));
- }
-
void setActive() {
this.votedToHalt = false;
}
@@ -314,7 +276,7 @@ public abstract class Vertex<V extends W
}
this.value.readFields(in);
}
-
+
this.edges = new ArrayList<Edge<V, E>>();
if (in.readBoolean()) {
int num = in.readInt();
@@ -345,7 +307,7 @@ public abstract class Vertex<V extends W
out.writeBoolean(true);
vertexID.write(out);
}
-
+
if (value == null) {
out.writeBoolean(false);
} else {
@@ -399,6 +361,30 @@ public abstract class Vertex<V extends W
}
+ /**
+ * Provides a value to the specified aggregator.
+ *
+ * @throws IOException
+ *
+ * @param name identifies an aggregator
+ * @param value value to be aggregated
+ */
+ @Override
+ public void aggregate(String name, M value) throws IOException {
+ MapWritable msg = new MapWritable();
+ msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name),
+ value);
+
+ // Get master task peer.
+ String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
+ runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+ }
+
+ @Override
+ public Writable getAggregatedValue(String name) {
+ return this.runner.getAggregationRunner().getAggregatedValue(name);
+ }
+
protected void setRunner(GraphJobRunner<V, E, M> runner) {
this.runner = runner;
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Thu Jan 9 01:10:59 2014
@@ -112,4 +112,19 @@ public interface VertexInterface<V exten
*/
public M getValue();
+ /**
+ * Provides a value to the specified aggregator.
+ *
+ * @throws IOException
+ *
+ * @param name identifies a aggregator
+ * @param value value to be aggregated
+ */
+ public void aggregate(String name, M value) throws IOException;
+
+ /**
+ * Returns the value of the specified aggregator.
+ */
+ public Writable getAggregatedValue(String name);
+
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Thu Jan 9 01:10:59 2014
@@ -44,7 +44,7 @@ import org.junit.Before;
public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
String[] input = new String[] { "stackoverflow.com\tyahoo.com",
- "facebook.com\ttwitter.com",
+ "facebook.com\ttwitter.com",
"facebook.com\tgoogle.com\tnasa.gov",
"yahoo.com\tnasa.gov\tstackoverflow.com",
"twitter.com\tgoogle.com\tfacebook.com",
@@ -56,6 +56,7 @@ public class TestSubmitGraphJob extends
@SuppressWarnings("rawtypes")
private static final List<Class<? extends VerticesInfo>> vi = new
ArrayList<Class<? extends VerticesInfo>>();
+ @Override
@Before
public void setUp() throws Exception {
super.setUp();
@@ -84,7 +85,7 @@ public class TestSubmitGraphJob extends
// set the defaults
bsp.setMaxIteration(30);
- bsp.setAggregatorClass(AverageAggregator.class);
+ bsp.registerAggregator("avg", AverageAggregator.class);
bsp.setInputFormat(SequenceFileInputFormat.class);
bsp.setInputKeyClass(Text.class);
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
(original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Thu Jan 9 01:10:59 2014
@@ -43,7 +43,7 @@ public class PageRank {
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
-
+
static double DAMPING_FACTOR = 0.85;
static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
@@ -74,7 +74,7 @@ public class PageRank {
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = getLastAggregatedValue(0);
+ DoubleWritable globalError = (DoubleWritable) getAggregatedValue("avg");
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
voteToHalt();
@@ -84,6 +84,8 @@ public class PageRank {
// in each superstep we are going to send a new rank to our neighbours
sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
/ this.getEdges().size()));
+
+ this.aggregate("avg", this.getValue());
}
}
@@ -126,7 +128,7 @@ public class PageRank {
}
// error
- pageJob.setAggregatorClass(AverageAggregator.class);
+ pageJob.registerAggregator("avg", AverageAggregator.class);
// Vertex reader
pageJob.setVertexInputReaderClass(PagerankSeqReader.class);