This patch is totally wrote wrongly. I'll revert this commit and re-open the HAMA-838.
Please assign me. On Thu, Jan 9, 2014 at 10:10 AM, <edwardy...@apache.org> wrote: > Author: edwardyoon > Date: Thu Jan 9 01:10:59 2014 > New Revision: 1556691 > > URL: http://svn.apache.org/r1556691 > Log: > HAMA-838: Refactor aggregators > > Added: > > hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java > Modified: > hama/trunk/CHANGES.txt > hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java > > hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > > hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java > hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java > hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java > > hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java > hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java > > Modified: hama/trunk/CHANGES.txt > URL: > http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/CHANGES.txt (original) > +++ hama/trunk/CHANGES.txt Thu Jan 9 01:10:59 2014 > @@ -20,6 +20,7 @@ Release 0.7.0 (unreleased changes) > > IMPROVEMENTS > > + HAMA-838: Refactor aggregators (Anastasis Andronidis) > HAMA-783: Improve the InMemory verticesInfo implementations (edwardyoon) > HAMA-829: Improve code and fix Javadoc warnings in org.apache.hama.pipes > (Martin Illecker) > HAMA-808: Hama Pipes Testcase (Martin Illecker) > > Modified: > hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java > URL: > http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java > (original) > +++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java > Thu Jan 9 01:10:59 2014 > @@ -40,6 +40,7 @@ import org.apache.hama.graph.VertexInput > * Real pagerank with dangling node contribution. > */ > public class PageRank { > + private static final String AVG_AGGREGATOR = "average.aggregator"; > > public static class PageRankVertex extends > Vertex<Text, NullWritable, DoubleWritable> { > @@ -63,29 +64,29 @@ public class PageRank { > public void compute(Iterable<DoubleWritable> messages) throws > IOException { > // initialize this vertex to 1 / count of global vertices in this graph > if (this.getSuperstepCount() == 0) { > - this.setValue(new DoubleWritable(1.0 / this.getNumVertices())); > + setValue(new DoubleWritable(1.0 / this.getNumVertices())); > } else if (this.getSuperstepCount() >= 1) { > double sum = 0; > for (DoubleWritable msg : messages) { > sum += msg.get(); > } > double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices(); > - this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR))); > + setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR))); > + aggregate(AVG_AGGREGATOR, this.getValue()); > } > > // if we have not reached our global error yet, then proceed. > - DoubleWritable globalError = getLastAggregatedValue(0); > + DoubleWritable globalError = (DoubleWritable) > getAggregatedValue(AVG_AGGREGATOR); > + > if (globalError != null && this.getSuperstepCount() > 2 > && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { > voteToHalt(); > - return; > + } else { > + // in each superstep we are going to send a new rank to our > neighbours > + sendMessageToNeighbors(new DoubleWritable(this.getValue().get() > + / this.getEdges().size())); > } > - > - // in each superstep we are going to send a new rank to our neighbours > - sendMessageToNeighbors(new DoubleWritable(this.getValue().get() > - / this.getEdges().size())); > } > - > } > > public static class PagerankSeqReader > @@ -126,7 +127,7 @@ public class PageRank { > } > > // error > - pageJob.setAggregatorClass(AverageAggregator.class); > + pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class); > > // Vertex reader > pageJob.setVertexInputReaderClass(PagerankSeqReader.class); > > Added: > hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java > URL: > http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java?rev=1556691&view=auto > ============================================================================== > --- > hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java > (added) > +++ > hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java > Thu Jan 9 01:10:59 2014 > @@ -0,0 +1,208 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one > + * or more contributor license agreements. See the NOTICE file > + * distributed with this work for additional information > + * regarding copyright ownership. The ASF licenses this file > + * to you under the Apache License, Version 2.0 (the > + * "License"); you may not use this file except in compliance > + * with the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.hama.examples; > + > +import java.io.BufferedReader; > +import java.io.IOException; > +import java.io.InputStreamReader; > + > +import junit.framework.TestCase; > + > +import org.apache.hadoop.conf.Configuration; > +import org.apache.hadoop.fs.FileStatus; > +import org.apache.hadoop.fs.FileSystem; > +import org.apache.hadoop.fs.Path; > +import org.apache.hadoop.io.DoubleWritable; > +import org.apache.hadoop.io.LongWritable; > +import org.apache.hadoop.io.NullWritable; > +import org.apache.hadoop.io.Text; > +import org.apache.hama.HamaConfiguration; > +import org.apache.hama.bsp.HashPartitioner; > +import org.apache.hama.bsp.TextInputFormat; > +import org.apache.hama.bsp.TextOutputFormat; > +import org.apache.hama.graph.GraphJob; > +import org.apache.hama.graph.SumAggregator; > +import org.apache.hama.graph.Vertex; > +import org.apache.hama.graph.VertexInputReader; > +import org.junit.Test; > + > +/** > + * Unit test for aggregators > + */ > +public class AggregatorsTest extends TestCase { > + private static String OUTPUT = "/tmp/page-out"; > + private Configuration conf = new HamaConfiguration(); > + private FileSystem fs; > + > + private void deleteTempDirs() { > + try { > + if (fs.exists(new Path(OUTPUT))) > + fs.delete(new Path(OUTPUT), true); > + } catch (IOException e) { > + e.printStackTrace(); > + } > + } > + > + private void verifyResult() throws IOException { > + FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*")); > + for (FileStatus fts : globStatus) { > + BufferedReader reader = new BufferedReader(new InputStreamReader( > + fs.open(fts.getPath()))); > + String line = null; > + > + String[] results = { "6.0", "2.0", "3.0", "4.0" }; > + > + for (int i = 1; i < 5; i++) { > + line = reader.readLine(); > + String[] split = line.split("\t"); > + assertTrue(split[0].equals(String.valueOf(i))); > + assertTrue(split[1].equals(results[i - 1])); > + System.out.println(split[0] + " : " + split[1]); > + } > + } > + } > + > + @Override > + protected void setUp() throws Exception { > + super.setUp(); > + fs = FileSystem.get(conf); > + } > + > + @Test > + public void test() throws IOException, InterruptedException, > + ClassNotFoundException { > + try { > + CustomAggregators > + .main(new String[] { "src/test/resources/dg.txt", OUTPUT }); > + verifyResult(); > + } finally { > + deleteTempDirs(); > + } > + } > + > + static class CustomAggregators { > + > + public static class GraphTextReader > + extends > + VertexInputReader<LongWritable, Text, Text, NullWritable, > DoubleWritable> { > + > + @Override > + public boolean parseVertex(LongWritable key, Text value, > + Vertex<Text, NullWritable, DoubleWritable> vertex) throws > Exception { > + > + vertex.setVertexID(value); > + vertex > + .setValue(new > DoubleWritable(Double.parseDouble(value.toString()))); > + > + return true; > + } > + } > + > + public static class GraphVertex extends > + Vertex<Text, NullWritable, DoubleWritable> { > + > + @Override > + public void compute(Iterable<DoubleWritable> msgs) throws IOException { > + > + // We will send 2 custom messages on superstep 2 and 4 only! > + if (this.getSuperstepCount() == 2) { > + this.aggregate("mySum", new DoubleWritable(1.0)); > + } > + > + if (this.getSuperstepCount() == 4) { > + this.aggregate("mySum", new DoubleWritable(2.0)); > + } > + > + // We will get the first aggrigation result from our custom > aggregator > + // on superstep 3, > + // and we will store the result only in vertex 4. > + // This vertex should have value = 4. > + if (this.getSuperstepCount() == 3 > + && this.getVertexID().toString().equals("4")) { > + this.setValue((DoubleWritable) this.getAggregatedValue("mySum")); > + } > + > + // By setting vertex number 3 to halt, we will see a change on the > + // aggregating results > + // in both custom and global aggregators. > + // This vertex should have value = 3. > + if (this.getSuperstepCount() == 3 > + && this.getVertexID().toString().equals("3")) { > + this.voteToHalt(); > + } > + > + // This vertex should have value = 6 (3 vertices are working x 2 the > + // custom value) > + if (this.getSuperstepCount() == 5 > + && this.getVertexID().toString().equals("1")) { > + this.setValue((DoubleWritable) this.getAggregatedValue("mySum")); > + } > + > + if (this.getSuperstepCount() == 6) { > + this.voteToHalt(); > + } > + } > + } > + > + public static void main(String[] args) throws IOException, > + InterruptedException, ClassNotFoundException { > + if (args.length != 2) { > + printUsage(); > + } > + HamaConfiguration conf = new HamaConfiguration(new Configuration()); > + GraphJob graphJob = createJob(args, conf); > + long startTime = System.currentTimeMillis(); > + if (graphJob.waitForCompletion(true)) { > + System.out.println("Job Finished in " > + + (System.currentTimeMillis() - startTime) / 1000.0 + " > seconds"); > + } > + } > + > + private static void printUsage() { > + System.out.println("Usage: <input> <output>"); > + System.exit(-1); > + } > + > + private static GraphJob createJob(String[] args, HamaConfiguration conf) > + throws IOException { > + GraphJob graphJob = new GraphJob(conf, CustomAggregators.class); > + graphJob.setJobName("Custom Aggregators"); > + graphJob.setVertexClass(GraphVertex.class); > + > + graphJob.registerAggregator("mySum", SumAggregator.class); > + > + graphJob.setInputPath(new Path(args[0])); > + graphJob.setOutputPath(new Path(args[1])); > + > + graphJob.setVertexIDClass(Text.class); > + graphJob.setVertexValueClass(DoubleWritable.class); > + graphJob.setEdgeValueClass(NullWritable.class); > + > + graphJob.setInputFormat(TextInputFormat.class); > + > + graphJob.setVertexInputReaderClass(GraphTextReader.class); > + graphJob.setPartitioner(HashPartitioner.class); > + > + graphJob.setOutputFormat(TextOutputFormat.class); > + graphJob.setOutputKeyClass(Text.class); > + graphJob.setOutputValueClass(DoubleWritable.class); > + > + return graphJob; > + } > + } > +} > > Modified: > hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- > hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java > (original) > +++ > hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java > Thu Jan 9 01:10:59 2014 > @@ -17,22 +17,20 @@ > */ > package org.apache.hama.graph; > > -import java.io.IOException; > +import java.util.HashMap; > import java.util.HashSet; > +import java.util.Map; > +import java.util.Map.Entry; > import java.util.Set; > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.io.IntWritable; > -import org.apache.hadoop.io.LongWritable; > import org.apache.hadoop.io.MapWritable; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.io.Writable; > import org.apache.hadoop.io.WritableComparable; > import org.apache.hadoop.util.ReflectionUtils; > import org.apache.hama.bsp.BSPPeer; > -import org.apache.hama.bsp.sync.SyncException; > - > -import com.google.common.base.Preconditions; > > /** > * Runner class to do the tasks that need to be done if aggregation was > @@ -41,117 +39,31 @@ import com.google.common.base.Preconditi > */ > @SuppressWarnings("rawtypes") > public final class AggregationRunner<V extends WritableComparable, E extends > Writable, M extends Writable> { > + private Map<String, Aggregator> Aggregators; > + private Map<String, Writable> aggregatorResults; > + private Set<String> aggregatorsUsed; > > - // multiple aggregator arrays > - private Aggregator<M, Vertex<V, E, M>>[] aggregators; > - private Set<Integer> skipAggregators; > - private Writable[] globalAggregatorResult; > - private IntWritable[] globalAggregatorIncrement; > - private boolean[] isAbstractAggregator; > - private String[] aggregatorClassNames; > - private Text[] aggregatorValueFlag; > - private Text[] aggregatorIncrementFlag; > - // aggregator on the master side > - private Aggregator<M, Vertex<V, E, M>>[] masterAggregator; > - > - private boolean enabled = false; > private Configuration conf; > + private Text textWrap = new Text(); > > - @SuppressWarnings("unchecked") > public void setupAggregators( > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) > { > this.conf = peer.getConfiguration(); > - String aggregatorClasses = peer.getConfiguration().get( > + > + this.aggregatorResults = new HashMap<String, Writable>(4); > + this.Aggregators = new HashMap<String, Aggregator>(4); > + this.aggregatorsUsed = new HashSet<String>(4); > + > + String customAggregatorClasses = peer.getConfiguration().get( > GraphJob.AGGREGATOR_CLASS_ATTR); > - this.skipAggregators = new HashSet<Integer>(); > - if (aggregatorClasses != null) { > - enabled = true; > - aggregatorClassNames = aggregatorClasses.split(";"); > - // init to the split size > - aggregators = new Aggregator[aggregatorClassNames.length]; > - globalAggregatorResult = new Writable[aggregatorClassNames.length]; > - globalAggregatorIncrement = new > IntWritable[aggregatorClassNames.length]; > - isAbstractAggregator = new boolean[aggregatorClassNames.length]; > - aggregatorValueFlag = new Text[aggregatorClassNames.length]; > - aggregatorIncrementFlag = new Text[aggregatorClassNames.length]; > - if (GraphJobRunner.isMasterTask(peer)) { > - masterAggregator = new Aggregator[aggregatorClassNames.length]; > - } > - for (int i = 0; i < aggregatorClassNames.length; i++) { > - aggregators[i] = getNewAggregator(aggregatorClassNames[i]); > - aggregatorValueFlag[i] = new Text( > - GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i); > - aggregatorIncrementFlag[i] = new Text( > - GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i); > - if (aggregators[i] instanceof AbstractAggregator) { > - isAbstractAggregator[i] = true; > - } > - if (GraphJobRunner.isMasterTask(peer)) { > - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]); > - } > - } > - } > - } > > - /** > - * Runs the aggregators by sending their values to the master task. > - * @param changedVertexCnt > - */ > - public void sendAggregatorValues( > - BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer, > - int activeVertices, int changedVertexCnt) throws IOException { > - // send msgCounts to the master task > - MapWritable updatedCnt = new MapWritable(); > - updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable( > - activeVertices)); > - // send total number of vertices changes > - updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new > LongWritable( > - changedVertexCnt)); > - // also send aggregated values to the master > - if (aggregators != null) { > - for (int i = 0; i < this.aggregators.length; i++) { > - if (!this.skipAggregators.contains(i)) { > - updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue()); > - if (isAbstractAggregator[i]) { > - updatedCnt.put(aggregatorIncrementFlag[i], > - ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i]) > - .getTimesAggregated()); > - } > - } > - } > - for (int i = 0; i < aggregators.length; i++) { > - if (!this.skipAggregators.contains(i)) { > - // now create new aggregators for the next iteration > - aggregators[i] = getNewAggregator(aggregatorClassNames[i]); > - if (GraphJobRunner.isMasterTask(peer)) { > - masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]); > - } > - } > - } > - } > - peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage( > - updatedCnt)); > - } > + if (customAggregatorClasses != null) { > + String[] custAggrs = customAggregatorClasses.split(";"); > > - /** > - * Aggregates the last value before computation and the value after the > - * computation. > - * > - * @param lastValue the value before compute(). > - * @param v the vertex. > - */ > - public void aggregateVertex(M lastValue, Vertex<V, E, M> v) { > - if (isEnabled()) { > - for (int i = 0; i < this.aggregators.length; i++) { > - if (!this.skipAggregators.contains(i)) { > - Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i]; > - aggregator.aggregate(v, v.getValue()); > - if (isAbstractAggregator[i]) { > - AbstractAggregator<M, Vertex<V, E, M>> intern = > (AbstractAggregator<M, Vertex<V, E, M>>) aggregator; > - intern.aggregate(v, lastValue, v.getValue()); > - intern.aggregateInternal(); > - } > - } > + for (String aggr : custAggrs) { > + String[] Name_AggrClass = aggr.split("@", 2); > + this.Aggregators.put(Name_AggrClass[0], > + getNewAggregator(Name_AggrClass[1])); > } > } > } > @@ -161,26 +73,20 @@ public final class AggregationRunner<V e > * peer and updates the given map accordingly. > */ > public void doMasterAggregation(MapWritable updatedCnt) { > - if (isEnabled()) { > - // work through the master aggregators > - for (int i = 0; i < masterAggregator.length; i++) { > - if (!this.skipAggregators.contains(i)) { > - Writable lastAggregatedValue = masterAggregator[i].getValue(); > - if (isAbstractAggregator[i]) { > - final AbstractAggregator<M, Vertex<V, E, M>> intern = > ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]); > - final Writable finalizeAggregation = > intern.finalizeAggregation(); > - if (intern.finalizeAggregation() != null) { > - lastAggregatedValue = finalizeAggregation; > - } > - // this count is usually the times of active > - // vertices in the graph > - updatedCnt.put(aggregatorIncrementFlag[i], > - intern.getTimesAggregated()); > - } > - updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue); > - } > - } > + // Get results only from used aggregators. > + for (String name : this.aggregatorsUsed) { > + updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue()); > + } > + this.aggregatorsUsed.clear(); > + > + // Reset all custom aggregators. TODO: Change the aggregation interface > to > + // include clean() method. > + Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4); > + for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) { > + String aggClass = e.getValue().getClass().getName(); > + tmp.put(e.getKey(), getNewAggregator(aggClass)); > } > + this.Aggregators = tmp; > } > > /** > @@ -190,13 +96,20 @@ public final class AggregationRunner<V e > * we haven't seen any messages anymore. > */ > public boolean receiveAggregatedValues(MapWritable updatedValues, > - long iteration) throws IOException, SyncException, > InterruptedException { > - // map is the first value that is in the queue > - for (int i = 0; i < aggregators.length; i++) { > - globalAggregatorResult[i] = > updatedValues.get(aggregatorValueFlag[i]); > - globalAggregatorIncrement[i] = (IntWritable) updatedValues > - .get(aggregatorIncrementFlag[i]); > + long iteration) { > + // In every superstep, we create a new result collection as we don't save > + // history. > + // If a value is missing, the user will take a null result. By creating a > + // new collection > + // every time, we can reduce the network cost (because we send less > + // information by skipping null values) > + // But we are losing in GC. > + this.aggregatorResults = new HashMap<String, Writable>(4); > + for (String name : this.Aggregators.keySet()) { > + this.textWrap.set(name); > + this.aggregatorResults.put(name, updatedValues.get(textWrap)); > } > + > IntWritable count = (IntWritable) updatedValues > .get(GraphJobRunner.FLAG_MESSAGE_COUNTS); > if (count != null && count.get() == Integer.MIN_VALUE) { > @@ -206,47 +119,16 @@ public final class AggregationRunner<V e > } > > /** > - * @return true if aggregators were defined. Normally used by the internal > - * stateful methods, outside shouldn't use it too extensively. > + * Method to let the custom master aggregator read messages from peers and > + * aggregate a value. > */ > - public boolean isEnabled() { > - return enabled; > - } > - > - /** > - * Method to let the master read messages from peers and aggregate a value. > - */ > - public void masterReadAggregatedValue(Text textIndex, M value) { > - int index = Integer.parseInt(textIndex.toString().split(";")[1]); > - masterAggregator[index].aggregate(null, value); > - } > - > - /** > - * Method to let the master read messages from peers and aggregate the > - * incremental value. > - */ > - public void masterReadAggregatedIncrementalValue(Text textIndex, M value) { > - int index = Integer.parseInt(textIndex.toString().split(";")[1]); > - if (isAbstractAggregator[index]) { > - ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index]) > - .addTimesAggregated(((IntWritable) value).get()); > - } > - } > + @SuppressWarnings("unchecked") > + public void masterAggregation(Text name, Writable value) { > + String nameIdx = name.toString().split(";", 2)[1]; > + this.Aggregators.get(nameIdx).aggregate(null, value); > > - /** > - * This method adds an id of an aggregator that will be skipped in the > current > - * superstep. > - */ > - public void addSkipAggregator(int index) { > - this.skipAggregators.add(index); > - } > - > - /** > - * This method adds an id of an aggregator that will be skipped in the > current > - * superstep. > - */ > - void resetSkipAggregators() { > - this.skipAggregators.clear(); > + // When it's time to send the values, we can see which aggregators are > used. > + this.aggregatorsUsed.add(nameIdx); > } > > @SuppressWarnings("unchecked") > @@ -261,13 +143,7 @@ public final class AggregationRunner<V e > + " could not be found or instantiated!"); > } > > - public final Writable getLastAggregatedValue(int index) { > - return globalAggregatorResult[Preconditions.checkPositionIndex(index, > - globalAggregatorResult.length)]; > - } > - > - public final IntWritable getNumLastAggregatedVertices(int index) { > - return globalAggregatorIncrement[Preconditions.checkPositionIndex(index, > - globalAggregatorIncrement.length)]; > + public final Writable getAggregatedValue(String name) { > + return this.aggregatorResults.get(name); > } > } > > Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java > (original) > +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu > Jan 9 01:10:59 2014 > @@ -102,23 +102,20 @@ public class GraphJob extends BSPJob { > } > > /** > - * Set the aggregator for the job. > - */ > - @SuppressWarnings({ "rawtypes", "unchecked" }) > - public void setAggregatorClass(Class<? extends Aggregator> cls) { > - this.setAggregatorClass(new Class[] { cls }); > - } > - > - /** > - * Sets multiple aggregators for the job. > - */ > + * Custom aggregator registration. Add a custom aggregator > + * that will aggregate massages sent from the user. > + * > + * @param name identifies an aggregator > + * @param aggregatorClass the aggregator class > + */ > @SuppressWarnings("rawtypes") > - public void setAggregatorClass(Class<? extends Aggregator>... cls) { > - String classNames = ""; > - for (Class<? extends Aggregator> cl : cls) { > - classNames += cl.getName() + ";"; > - } > - conf.set(AGGREGATOR_CLASS_ATTR, classNames); > + public void registerAggregator(String name, Class<? extends > + Aggregator> aggregatorClass) { > + String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, ""); > + > + prevAggrs += name + "@" + aggregatorClass.getName() + ";"; > + > + this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs); > } > > /** > > Modified: > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java > (original) > +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java > Thu Jan 9 01:10:59 2014 > @@ -92,7 +92,6 @@ public final class GraphJobMessage imple > } else { > vertexId.write(out); > } > - > } > > public void fastReadFields(DataInput in) throws IOException { > @@ -217,6 +216,7 @@ public final class GraphJobMessage imple > buffer = new DataInputBuffer(); > } > > + @Override > public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2, > int s2, int l2) { > try { > > Modified: > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > (original) > +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java > Thu Jan 9 01:10:59 2014 > @@ -64,12 +64,11 @@ public final class GraphJobRunner<V exte > // make sure that these values don't collide with the vertex names > public static final String S_FLAG_MESSAGE_COUNTS = "hama.0"; > public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1"; > - public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2"; > - public static final String S_FLAG_VERTEX_INCREASE = "hama.3"; > - public static final String S_FLAG_VERTEX_DECREASE = "hama.4"; > - public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5"; > - public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6"; > - public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7"; > + public static final String S_FLAG_VERTEX_INCREASE = "hama.2"; > + public static final String S_FLAG_VERTEX_DECREASE = "hama.3"; > + public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4"; > + public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5"; > + > public static final Text FLAG_MESSAGE_COUNTS = new > Text(S_FLAG_MESSAGE_COUNTS); > public static final Text FLAG_VERTEX_INCREASE = new Text( > S_FLAG_VERTEX_INCREASE); > @@ -79,8 +78,6 @@ public final class GraphJobRunner<V exte > S_FLAG_VERTEX_ALTER_COUNTER); > public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text( > S_FLAG_VERTEX_TOTAL_VERTICES); > - public static final Text FLAG_AGGREGATOR_SKIP = new Text( > - S_FLAG_AGGREGATOR_SKIP); > > public static final String MESSAGE_COMBINER_CLASS_KEY = > "hama.vertex.message.combiner.class"; > public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class"; > @@ -183,22 +180,10 @@ public final class GraphJobRunner<V exte > BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) > throws IOException, SyncException, InterruptedException { > > - if (isMasterTask(peer) && iteration == 1) { > - MapWritable updatedCnt = new MapWritable(); > - updatedCnt.put( > - FLAG_VERTEX_TOTAL_VERTICES, > - new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) > - .getCounter()))); > - // send the updates from the master tasks back to the slaves > - for (String peerName : peer.getAllPeerNames()) { > - peer.send(peerName, new GraphJobMessage(updatedCnt)); > - } > - } > - > - // this is only done in every second iteration > - if (isMasterTask(peer) && iteration > 1) { > + // This run only on master > + if (isMasterTask(peer)) { > MapWritable updatedCnt = new MapWritable(); > - // send total number of vertices. > + // send total number of vertices > updatedCnt.put( > FLAG_VERTEX_TOTAL_VERTICES, > new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES) > @@ -214,26 +199,26 @@ public final class GraphJobRunner<V exte > peer.send(peerName, new GraphJobMessage(updatedCnt)); > } > } > - if (getAggregationRunner().isEnabled() && iteration > 1) { > - // in case we need to sync, we need to replay the messages that already > - // are added to the queue. This prevents loosing messages when using > - // aggregators. > - if (firstVertexMessage != null) { > - peer.send(peer.getPeerName(), firstVertexMessage); > - } > - GraphJobMessage msg = null; > - while ((msg = peer.getCurrentMessage()) != null) { > - peer.send(peer.getPeerName(), msg); > - } > - // now sync > - peer.sync(); > - // now the map message must be read that might be send from the master > - updated = getAggregationRunner().receiveAggregatedValues( > - peer.getCurrentMessage().getMap(), iteration); > - // set the first vertex message back to the message it had before sync > - firstVertexMessage = peer.getCurrentMessage(); > + > + // in case we need to sync, we need to replay the messages that already > + // are added to the queue. This prevents loosing messages when using > + // aggregators. > + if (firstVertexMessage != null) { > + peer.send(peer.getPeerName(), firstVertexMessage); > } > - this.aggregationRunner.resetSkipAggregators(); > + > + GraphJobMessage msg = null; > + while ((msg = peer.getCurrentMessage()) != null) { > + peer.send(peer.getPeerName(), msg); > + } > + > + // now sync > + peer.sync(); > + // now the map message must be read that might be send from the master > + updated = getAggregationRunner().receiveAggregatedValues( > + peer.getCurrentMessage().getMap(), iteration); > + // set the first vertex message back to the message it had before sync > + firstVertexMessage = peer.getCurrentMessage(); > return firstVertexMessage; > } > > @@ -274,7 +259,6 @@ public final class GraphJobRunner<V exte > } > > if (!vertex.isHalted()) { > - M lastValue = vertex.getValue(); > if (iterable == null) { > vertex.compute(Collections.<M> emptyList()); > } else { > @@ -286,7 +270,6 @@ public final class GraphJobRunner<V exte > } > currentMessage = iterable.getOverflowMessage(); > } > - getAggregationRunner().aggregateVertex(lastValue, vertex); > activeVertices++; > } > > @@ -296,8 +279,7 @@ public final class GraphJobRunner<V exte > } > vertices.finishSuperstep(); > > - getAggregationRunner().sendAggregatorValues(peer, activeVertices, > - this.changedVertexCnt); > + sendControllValues(activeVertices, this.changedVertexCnt); > iteration++; > } > > @@ -357,15 +339,13 @@ public final class GraphJobRunner<V exte > while (skippingIterator.hasNext()) { > Vertex<V, E, M> vertex = skippingIterator.next(); > > - M lastValue = vertex.getValue(); > // Calls setup method. > vertex.setup(conf); > vertex.compute(Collections.singleton(vertex.getValue())); > - getAggregationRunner().aggregateVertex(lastValue, vertex); > vertices.finishVertexComputation(vertex); > } > vertices.finishSuperstep(); > - getAggregationRunner().sendAggregatorValues(peer, 1, > this.changedVertexCnt); > + sendControllValues(1, this.changedVertexCnt); > iteration++; > } > > @@ -594,14 +574,10 @@ public final class GraphJobRunner<V exte > } else { > globalUpdateCounts += ((IntWritable) e.getValue()).get(); > } > - } else if (getAggregationRunner().isEnabled() > - && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { > - getAggregationRunner().masterReadAggregatedValue(vertexID, > - (M) e.getValue()); > - } else if (getAggregationRunner().isEnabled() > - && > vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) { > - getAggregationRunner().masterReadAggregatedIncrementalValue( > - vertexID, (M) e.getValue()); > + > + } else if > (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) { > + this.getAggregationRunner().masterAggregation(vertexID, > + e.getValue()); > } else if (FLAG_VERTEX_INCREASE.equals(vertexID)) { > dynamicAdditions = true; > addVertex((Vertex<V, E, M>) e.getValue()); > @@ -619,21 +595,11 @@ public final class GraphJobRunner<V exte > "A message to increase vertex count is in a wrong place: " > + peer); > } > - } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) { > - if (isMasterTask(peer)) { > - this.getAggregationRunner().addSkipAggregator( > - ((IntWritable) e.getValue()).get()); > - } else { > - throw new UnsupportedOperationException( > - "A message to skip aggregators is in a wrong peer: " + > peer); > - } > } > } > - > } else { > throw new UnsupportedOperationException("Unknown message type: " + > msg); > } > - > } > > // If we applied any changes to vertices, we need to call finishAdditions > @@ -677,23 +643,20 @@ public final class GraphJobRunner<V exte > } > > /** > - * Gets the last aggregated value at the given index. The index is > dependend > - * on how the aggregators were configured during job setup phase. > - * > - * @return the value of the aggregator, or null if none was defined. > - */ > - public final Writable getLastAggregatedValue(int index) { > - return getAggregationRunner().getLastAggregatedValue(index); > - } > - > - /** > - * Gets the last aggregated number of vertices at the given index. The > index > - * is dependend on how the aggregators were configured during job setup > phase. > + * Runs internal aggregators and send their values to the master task. > * > - * @return the value of the aggregator, or null if none was defined. > + * @param activeVertices number of active vertices in this peer > + * @param changedVertexCnt number of added/removed vertices in a superstep > */ > - public final IntWritable getNumLastAggregatedVertices(int index) { > - return getAggregationRunner().getNumLastAggregatedVertices(index); > + private void sendControllValues(int activeVertices, int changedVertexCnt) > + throws IOException { > + // send msgCounts to the master task > + MapWritable updatedCnt = new MapWritable(); > + updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices)); > + // send total number of vertices changes > + updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER, > + new LongWritable(changedVertexCnt)); > + peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt)); > } > > /** > > Modified: > hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- > hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java > (original) > +++ > hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java > Thu Jan 9 01:10:59 2014 > @@ -82,6 +82,7 @@ public class OffHeapVerticesInfo<V exten > vertices.dump(); > } > > + @Override > public void addVertex(Vertex<V, E, M> vertex) { > vertices.put(vertex.getVertexID(), vertex); > } > @@ -108,6 +109,7 @@ public class OffHeapVerticesInfo<V exten > vertices.clear(); > } > > + @Override > public int size() { > return (int) this.vertices.entries(); > } > > Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java > (original) > +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan > 9 01:10:59 2014 > @@ -27,8 +27,8 @@ import java.io.IOException; > import java.util.ArrayList; > import java.util.List; > > -import org.apache.hadoop.io.IntWritable; > import org.apache.hadoop.io.MapWritable; > +import org.apache.hadoop.io.Text; > import org.apache.hadoop.io.Writable; > import org.apache.hadoop.io.WritableComparable; > import org.apache.hama.HamaConfiguration; > @@ -75,7 +75,7 @@ public abstract class Vertex<V extends W > @Override > public void setup(HamaConfiguration conf) { > } > - > + > @Override > public void sendMessage(Edge<V, E> e, M msg) throws IOException { > runner.getPeer().send(getDestinationPeerName(e), > @@ -120,24 +120,26 @@ public abstract class Vertex<V extends W > private void alterVertexCounter(int i) throws IOException { > this.runner.setChangedVertexCnt(this.runner.getChangedVertexCnt() + i); > } > - > + > @Override > - public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) throws > IOException { > + public void addVertex(V vertexID, List<Edge<V, E>> edges, M value) > + throws IOException { > MapWritable msg = new MapWritable(); > // Create the new vertex. > - Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M> > newVertexInstance(GraphJobRunner.VERTEX_CLASS); > + Vertex<V, E, M> vertex = GraphJobRunner > + .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS); > vertex.setEdges(edges); > vertex.setValue(value); > vertex.setVertexID(vertexID); > - > + > msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex); > // Find the proper partition to host the new vertex. > - int partition = getPartitioner().getPartition(vertexID, value, > + int partition = getPartitioner().getPartition(vertexID, value, > runner.getPeer().getNumPeers()); > String destPeer = runner.getPeer().getAllPeerNames()[partition]; > - > + > runner.getPeer().send(destPeer, new GraphJobMessage(msg)); > - > + > alterVertexCounter(1); > } > > @@ -145,11 +147,11 @@ public abstract class Vertex<V extends W > public void remove() throws IOException { > MapWritable msg = new MapWritable(); > msg.put(GraphJobRunner.FLAG_VERTEX_DECREASE, this.vertexID); > - > + > // Get master task peer. > String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); > runner.getPeer().send(destPeer, new GraphJobMessage(msg)); > - > + > alterVertexCounter(-1); > } > > @@ -192,31 +194,6 @@ public abstract class Vertex<V extends W > return runner.getMaxIteration(); > } > > - /** > - * Get the last aggregated value of the defined aggregator, null if nothing > - * was configured or not returned a result. You have to supply an index, > the > - * index is defined by the order you set the aggregator classes in > - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at > zero, > - * so if you have a single aggregator you can retrieve it via > - * {@link #getLastAggregatedValue}(0). > - */ > - @SuppressWarnings("unchecked") > - public M getLastAggregatedValue(int index) { > - return (M) runner.getLastAggregatedValue(index); > - } > - > - /** > - * Get the number of aggregated vertices in the last superstep. Or null if > no > - * aggregator is available.You have to supply an index, the index is > defined > - * by the order you set the aggregator classes in > - * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at > zero, > - * so if you have a single aggregator you can retrieve it via > - * {@link #getNumLastAggregatedVertices}(0). > - */ > - public IntWritable getNumLastAggregatedVertices(int index) { > - return runner.getNumLastAggregatedVertices(index); > - } > - > public int getNumPeers() { > return runner.getPeer().getNumPeers(); > } > @@ -245,21 +222,6 @@ public abstract class Vertex<V extends W > this.votedToHalt = true; > } > > - /** > - * Disable an aggregator for the next superstep. The returning value of > - * the aggregator will be null. > - */ > - public void skipAggregator(int index) throws IOException { > - MapWritable msg = new MapWritable(); > - msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index)); > - > - this.runner.getAggregationRunner().addSkipAggregator(index); > - > - // Get master task peer. > - String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); > - runner.getPeer().send(destPeer, new GraphJobMessage(msg)); > - } > - > void setActive() { > this.votedToHalt = false; > } > @@ -314,7 +276,7 @@ public abstract class Vertex<V extends W > } > this.value.readFields(in); > } > - > + > this.edges = new ArrayList<Edge<V, E>>(); > if (in.readBoolean()) { > int num = in.readInt(); > @@ -345,7 +307,7 @@ public abstract class Vertex<V extends W > out.writeBoolean(true); > vertexID.write(out); > } > - > + > if (value == null) { > out.writeBoolean(false); > } else { > @@ -399,6 +361,30 @@ public abstract class Vertex<V extends W > > } > > + /** > + * Provides a value to the specified aggregator. > + * > + * @throws IOException > + * > + * @param name identifies an aggregator > + * @param value value to be aggregated > + */ > + @Override > + public void aggregate(String name, M value) throws IOException { > + MapWritable msg = new MapWritable(); > + msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name), > + value); > + > + // Get master task peer. > + String destPeer = GraphJobRunner.getMasterTask(this.getPeer()); > + runner.getPeer().send(destPeer, new GraphJobMessage(msg)); > + } > + > + @Override > + public Writable getAggregatedValue(String name) { > + return this.runner.getAggregationRunner().getAggregatedValue(name); > + } > + > protected void setRunner(GraphJobRunner<V, E, M> runner) { > this.runner = runner; > } > > Modified: > hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java > (original) > +++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java > Thu Jan 9 01:10:59 2014 > @@ -112,4 +112,19 @@ public interface VertexInterface<V exten > */ > public M getValue(); > > + /** > + * Provides a value to the specified aggregator. > + * > + * @throws IOException > + * > + * @param name identifies a aggregator > + * @param value value to be aggregated > + */ > + public void aggregate(String name, M value) throws IOException; > + > + /** > + * Returns the value of the specified aggregator. > + */ > + public Writable getAggregatedValue(String name); > + > } > > Modified: > hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- > hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java > (original) > +++ > hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java > Thu Jan 9 01:10:59 2014 > @@ -44,7 +44,7 @@ import org.junit.Before; > public class TestSubmitGraphJob extends TestBSPMasterGroomServer { > > String[] input = new String[] { "stackoverflow.com\tyahoo.com", > - "facebook.com\ttwitter.com", > + "facebook.com\ttwitter.com", > "facebook.com\tgoogle.com\tnasa.gov", > "yahoo.com\tnasa.gov\tstackoverflow.com", > "twitter.com\tgoogle.com\tfacebook.com", > @@ -56,6 +56,7 @@ public class TestSubmitGraphJob extends > @SuppressWarnings("rawtypes") > private static final List<Class<? extends VerticesInfo>> vi = new > ArrayList<Class<? extends VerticesInfo>>(); > > + @Override > @Before > public void setUp() throws Exception { > super.setUp(); > @@ -84,7 +85,7 @@ public class TestSubmitGraphJob extends > // set the defaults > bsp.setMaxIteration(30); > > - bsp.setAggregatorClass(AverageAggregator.class); > + bsp.registerAggregator("avg", AverageAggregator.class); > > bsp.setInputFormat(SequenceFileInputFormat.class); > bsp.setInputKeyClass(Text.class); > > Modified: > hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java > URL: > http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1556691&r1=1556690&r2=1556691&view=diff > ============================================================================== > --- > hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java > (original) > +++ > hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java > Thu Jan 9 01:10:59 2014 > @@ -43,7 +43,7 @@ public class PageRank { > > public static class PageRankVertex extends > Vertex<Text, NullWritable, DoubleWritable> { > - > + > static double DAMPING_FACTOR = 0.85; > static double MAXIMUM_CONVERGENCE_ERROR = 0.001; > > @@ -74,7 +74,7 @@ public class PageRank { > } > > // if we have not reached our global error yet, then proceed. > - DoubleWritable globalError = getLastAggregatedValue(0); > + DoubleWritable globalError = (DoubleWritable) > getAggregatedValue("avg"); > if (globalError != null && this.getSuperstepCount() > 2 > && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) { > voteToHalt(); > @@ -84,6 +84,8 @@ public class PageRank { > // in each superstep we are going to send a new rank to our neighbours > sendMessageToNeighbors(new DoubleWritable(this.getValue().get() > / this.getEdges().size())); > + > + this.aggregate("avg", this.getValue()); > } > > } > @@ -126,7 +128,7 @@ public class PageRank { > } > > // error > - pageJob.setAggregatorClass(AverageAggregator.class); > + pageJob.registerAggregator("avg", AverageAggregator.class); > > // Vertex reader > pageJob.setVertexInputReaderClass(PagerankSeqReader.class); > > -- Best Regards, Edward J. Yoon @eddieyoon