Author: edwardyoon
Date: Thu Jan 23 12:50:13 2014
New Revision: 1560672
URL: http://svn.apache.org/r1560672
Log:
HAMA-838: Refactor aggregator.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Jan
23 12:50:13 2014
@@ -459,11 +459,13 @@ public final class BSPPeerImpl<K1, V1, K
}
public final void close() {
- long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
- .getCounter()
- - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
- this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
- combinedMessages);
+ if (conf.get(Constants.COMBINER_CLASS) != null) {
+ long combinedMessages = this.getCounter(PeerCounter.TOTAL_MESSAGES_SENT)
+ .getCounter()
+ - this.getCounter(PeerCounter.TOTAL_MESSAGES_RECEIVED).getCounter();
+ this.getCounter(PeerCounter.TOTAL_MESSAGES_COMBINED).increment(
+ combinedMessages);
+ }
// there are many catches, because we want to close always every component
// even if the one before failed.
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Thu Jan 23 12:50:13 2014
@@ -19,7 +19,6 @@ package org.apache.hama.examples;
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
@@ -71,13 +70,15 @@ public class PageRank {
}
double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+ aggregate(0, this.getValue());
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = getLastAggregatedValue(0);
-
+ DoubleWritable globalError = getAggregatedValue(0);
+
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+ System.out.println(globalError);
voteToHalt();
} else {
// in each superstep we are going to send a new rank to our neighbours
@@ -126,7 +127,7 @@ public class PageRank {
// error
pageJob.setAggregatorClass(AverageAggregator.class);
-
+
// Vertex reader
pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
@@ -153,7 +154,7 @@ public class PageRank {
if (args.length < 2)
printUsage();
- HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ HamaConfiguration conf = new HamaConfiguration();
GraphJob pageJob = createJob(args, conf);
long startTime = System.currentTimeMillis();
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
Thu Jan 23 12:50:13 2014
@@ -25,13 +25,12 @@ import org.apache.hadoop.io.DoubleWritab
* (sums them up) them.
*/
public class AbsDiffAggregator extends
- AbstractAggregator<DoubleWritable, Vertex<?, ?, DoubleWritable>> {
+ AbstractAggregator<DoubleWritable> {
double absoluteDifference = 0.0d;
@Override
- public void aggregate(Vertex<?, ?, DoubleWritable> v,
- DoubleWritable oldValue, DoubleWritable newValue) {
+ public void aggregate(DoubleWritable oldValue, DoubleWritable newValue) {
// make sure it's nullsafe
if (oldValue != null) {
absoluteDifference += Math.abs(oldValue.get() - newValue.get());
@@ -41,8 +40,7 @@ public class AbsDiffAggregator extends
// when a master aggregates he aggregated values, he calls this, so let's
just
// sum up here.
@Override
- public void aggregate(Vertex<?, ?, DoubleWritable> vertex,
- DoubleWritable value) {
+ public void aggregate(DoubleWritable value) {
absoluteDifference += value.get();
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
Thu Jan 23 12:50:13 2014
@@ -26,8 +26,8 @@ import org.apache.hadoop.io.Writable;
* For tracking cases it increments an internal counter on each call of
* aggregate.
*/
-public abstract class AbstractAggregator<M extends Writable, VERTEX extends
Vertex<?, ?, M>>
- implements Aggregator<M, VERTEX> {
+public abstract class AbstractAggregator<M extends Writable>
+ implements Aggregator<M> {
private int timesAggregated = 0;
@@ -52,7 +52,7 @@ public abstract class AbstractAggregator
* this will always be null.
*/
@Override
- public void aggregate(VERTEX vertex, M value) {
+ public void aggregate(M value) {
}
@@ -62,7 +62,7 @@ public abstract class AbstractAggregator
* implementation in this class.Please make sure that you are null-checking
* vertex, since on a master task this will always be null.
*/
- public void aggregate(VERTEX vertex, M oldValue, M newValue) {
+ public void aggregate(M oldValue, M newValue) {
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Thu Jan 23 12:50:13 2014
@@ -41,7 +41,7 @@ import com.google.common.base.Preconditi
public final class AggregationRunner<V extends WritableComparable, E extends
Writable, M extends Writable> {
// multiple aggregator arrays
- private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+ private Aggregator<M>[] aggregators;
private Writable[] globalAggregatorResult;
private IntWritable[] globalAggregatorIncrement;
private boolean[] isAbstractAggregator;
@@ -49,7 +49,7 @@ public final class AggregationRunner<V e
private Text[] aggregatorValueFlag;
private Text[] aggregatorIncrementFlag;
// aggregator on the master side
- private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+ private Aggregator<M>[] masterAggregator;
private boolean enabled = false;
private Configuration conf;
@@ -110,8 +110,7 @@ public final class AggregationRunner<V e
updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
if (isAbstractAggregator[i]) {
updatedCnt.put(aggregatorIncrementFlag[i],
- ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
- .getTimesAggregated());
+ ((AbstractAggregator<M>) aggregators[i]).getTimesAggregated());
}
}
for (int i = 0; i < aggregators.length; i++) {
@@ -133,16 +132,14 @@ public final class AggregationRunner<V e
* @param lastValue the value before compute().
* @param v the vertex.
*/
- public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
+ public void aggregateVertex(int index, M lastValue, M value) {
if (isEnabled()) {
- for (int i = 0; i < this.aggregators.length; i++) {
- Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
- aggregator.aggregate(v, v.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
- intern.aggregate(v, lastValue, v.getValue());
- intern.aggregateInternal();
- }
+ Aggregator<M> aggregator = this.aggregators[index];
+ aggregator.aggregate(value);
+ if (isAbstractAggregator[index]) {
+ AbstractAggregator<M> intern = (AbstractAggregator<M>) aggregator;
+ intern.aggregate(lastValue, value);
+ intern.aggregateInternal();
}
}
}
@@ -157,7 +154,7 @@ public final class AggregationRunner<V e
for (int i = 0; i < masterAggregator.length; i++) {
Writable lastAggregatedValue = masterAggregator[i].getValue();
if (isAbstractAggregator[i]) {
- final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
+ final AbstractAggregator<M> intern = ((AbstractAggregator<M>)
masterAggregator[i]);
final Writable finalizeAggregation = intern.finalizeAggregation();
if (intern.finalizeAggregation() != null) {
lastAggregatedValue = finalizeAggregation;
@@ -207,7 +204,7 @@ public final class AggregationRunner<V e
*/
public void masterReadAggregatedValue(Text textIndex, M value) {
int index = Integer.parseInt(textIndex.toString().split(";")[1]);
- masterAggregator[index].aggregate(null, value);
+ masterAggregator[index].aggregate(value);
}
/**
@@ -217,15 +214,15 @@ public final class AggregationRunner<V e
public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
int index = Integer.parseInt(textIndex.toString().split(";")[1]);
if (isAbstractAggregator[index]) {
- ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+ ((AbstractAggregator<M>) masterAggregator[index])
.addTimesAggregated(((IntWritable) value).get());
}
}
@SuppressWarnings("unchecked")
- private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
+ private Aggregator<M> getNewAggregator(String clsName) {
try {
- return (Aggregator<M, Vertex<V, E, M>>) ReflectionUtils.newInstance(
+ return (Aggregator<M>) ReflectionUtils.newInstance(
conf.getClassByName(clsName), conf);
} catch (ClassNotFoundException e) {
e.printStackTrace();
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java Thu
Jan 23 12:50:13 2014
@@ -27,12 +27,12 @@ import org.apache.hadoop.io.Writable;
* The result of an aggregator from the last superstep can be picked up by the
* vertex itself via {@link Vertex}#getLastAggregatedValue();
*/
-public interface Aggregator<M extends Writable, VERTEX extends Vertex<?, ?,
?>> {
+public interface Aggregator<M extends Writable> {
/**
* Observes a new vertex value.
*/
- public void aggregate(VERTEX vertex, M value);
+ public void aggregate(M value);
/**
* Gets a vertex value.
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan
23 12:50:13 2014
@@ -46,7 +46,6 @@ public class GraphJob extends BSPJob {
public final static String VERTEX_OUTPUT_WRITER_CLASS_ATTR =
"hama.graph.vertex.output.writer.class";
public final static String AGGREGATOR_CLASS_ATTR =
"hama.graph.aggregator.class";
- public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR =
"hama.vertex.message.combiner.class";
/**
* Creates a new Graph Job with the given configuration and an exampleClass.
@@ -164,7 +163,7 @@ public class GraphJob extends BSPJob {
@Override
public void setCombinerClass(Class<? extends Combiner<? extends Writable>>
cls) {
ensureState(JobState.DEFINE);
- conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
+ conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Jan 23 12:50:13 2014
@@ -78,7 +78,6 @@ public final class GraphJobRunner<V exte
public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text(
S_FLAG_VERTEX_TOTAL_VERTICES);
- public static final String MESSAGE_COMBINER_CLASS_KEY =
"hama.vertex.message.combiner.class";
public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
private HamaConfiguration conf;
@@ -269,14 +268,12 @@ public final class GraphJobRunner<V exte
}
if (!vertex.isHalted()) {
- M lastValue = vertex.getValue();
if (iterable == null) {
vertex.compute(Collections.<M> emptyList());
} else {
vertex.compute(iterable);
currentMessage = iterable.getOverflowMessage();
}
- getAggregationRunner().aggregateVertex(lastValue, vertex);
activeVertices++;
}
@@ -338,7 +335,6 @@ public final class GraphJobRunner<V exte
* Seed the vertices first with their own values in compute. This is the
first
* superstep after the vertices have been loaded.
*/
- @SuppressWarnings("unused")
private void doInitialSuperstep(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
@@ -347,7 +343,6 @@ public final class GraphJobRunner<V exte
IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
while (skippingIterator.hasNext()) {
Vertex<V, E, M> vertex = skippingIterator.next();
- M lastValue = vertex.getValue();
// Calls setup method.
vertex.setup(conf);
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java Thu
Jan 23 12:50:13 2014
@@ -20,12 +20,12 @@ package org.apache.hama.graph;
import org.apache.hadoop.io.IntWritable;
public class MaxAggregator extends
- AbstractAggregator<IntWritable, Vertex<?, ?, IntWritable>> {
+ AbstractAggregator<IntWritable> {
int max = Integer.MIN_VALUE;
@Override
- public void aggregate(Vertex<?, ?, IntWritable> vertex, IntWritable value) {
+ public void aggregate(IntWritable value) {
if (value.get() > max) {
max = value.get();
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Thu
Jan 23 12:50:13 2014
@@ -20,12 +20,12 @@ package org.apache.hama.graph;
import org.apache.hadoop.io.IntWritable;
public class MinAggregator extends
- AbstractAggregator<IntWritable, Vertex<?, ?, IntWritable>> {
+ AbstractAggregator<IntWritable> {
int min = Integer.MAX_VALUE;
@Override
- public void aggregate(Vertex<?, ?, IntWritable> vertex, IntWritable value) {
+ public void aggregate(IntWritable value) {
if (value.get() < min) {
min = value.get();
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
Thu Jan 23 12:50:13 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.OutgoingMessageManager;
@@ -51,14 +52,13 @@ public class OutgoingVertexMessagesManag
@SuppressWarnings("unchecked")
@Override
public void init(Configuration conf) {
- if (!conf.getClass(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY,
- Combiner.class).equals(Combiner.class)) {
- LOG.debug("Combiner class: "
- + conf.get(GraphJobRunner.MESSAGE_COMBINER_CLASS_KEY));
+ if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
+ Combiner.class)) {
+ LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
combiner = (Combiner<Writable>) org.apache.hadoop.util.ReflectionUtils
- .newInstance(conf.getClass("hama.vertex.message.combiner.class",
- Combiner.class), conf);
+ .newInstance(conf.getClass(Constants.COMBINER_CLASS, Combiner.class),
+ conf);
}
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/SumAggregator.java Thu
Jan 23 12:50:13 2014
@@ -23,13 +23,12 @@ import org.apache.hadoop.io.DoubleWritab
* Sums all vertex values globally.
*/
public class SumAggregator extends
- AbstractAggregator<DoubleWritable, Vertex<?, ?, DoubleWritable>> {
+ AbstractAggregator<DoubleWritable> {
double sum = 0.0d;
@Override
- public void aggregate(Vertex<?, ?, DoubleWritable> vertex,
- DoubleWritable value) {
+ public void aggregate(DoubleWritable value) {
sum += value.get();
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan 23
12:50:13 2014
@@ -58,6 +58,7 @@ public abstract class Vertex<V extends W
private transient GraphJobRunner<V, E, M> runner;
private V vertexID;
+ private M oldValue;
private M value;
private List<Edge<V, E>> edges;
@@ -183,6 +184,7 @@ public abstract class Vertex<V extends W
@Override
public void setValue(M value) {
+ this.oldValue = this.value;
this.value = value;
}
@@ -194,31 +196,6 @@ public abstract class Vertex<V extends W
return runner.getMaxIteration();
}
- /**
- * Get the last aggregated value of the defined aggregator, null if nothing
- * was configured or not returned a result. You have to supply an index, the
- * index is defined by the order you set the aggregator classes in
- * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
- * so if you have a single aggregator you can retrieve it via
- * {@link #getLastAggregatedValue}(0).
- */
- @SuppressWarnings("unchecked")
- public M getLastAggregatedValue(int index) {
- return (M) runner.getLastAggregatedValue(index);
- }
-
- /**
- * Get the number of aggregated vertices in the last superstep. Or null if no
- * aggregator is available.You have to supply an index, the index is defined
- * by the order you set the aggregator classes in
- * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
- * so if you have a single aggregator you can retrieve it via
- * {@link #getNumLastAggregatedVertices}(0).
- */
- public IntWritable getNumLastAggregatedVertices(int index) {
- return runner.getNumLastAggregatedVertices(index);
- }
-
public int getNumPeers() {
return runner.getPeer().getNumPeers();
}
@@ -406,4 +383,35 @@ public abstract class Vertex<V extends W
vertex.readFields(dis);
return vertex;
}
+
+ @Override
+ public void aggregate(int index, M value) throws IOException {
+ this.runner.getAggregationRunner().aggregateVertex(index, oldValue, value);
+ }
+
+ /**
+ * Get the last aggregated value of the defined aggregator, null if nothing
+ * was configured or not returned a result. You have to supply an index, the
+ * index is defined by the order you set the aggregator classes in
+ * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+ * so if you have a single aggregator you can retrieve it via
+ * {@link #getLastAggregatedValue}(0).
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public M getAggregatedValue(int index) {
+ return (M) runner.getLastAggregatedValue(index);
+ }
+
+ /**
+ * Get the number of aggregated vertices in the last superstep. Or null if no
+ * aggregator is available.You have to supply an index, the index is defined
+ * by the order you set the aggregator classes in
+ * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+ * so if you have a single aggregator you can retrieve it via
+ * {@link #getNumLastAggregatedVertices}(0).
+ */
+ public IntWritable getNumLastAggregatedVertices(int index) {
+ return runner.getNumLastAggregatedVertices(index);
+ }
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Thu Jan 23 12:50:13 2014
@@ -111,5 +111,20 @@ public interface VertexInterface<V exten
* Gets the vertex value
*/
public M getValue();
+
+ /**
+ * Provides a value to the specified aggregator.
+ *
+ * @throws IOException
+ *
+ * @param name identifies a aggregator
+ * @param value value to be aggregated
+ */
+ public void aggregate(int index, M value) throws IOException;
+
+ /**
+ * Returns the value of the specified aggregator.
+ */
+ public Writable getAggregatedValue(int index);
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
Thu Jan 23 12:50:13 2014
@@ -27,9 +27,9 @@ public class TestAbsDiffAggregator exten
@Test
public void testAggregator() {
AbsDiffAggregator diff = new AbsDiffAggregator();
- diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
- diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
- diff.aggregate(null, null, new DoubleWritable(5));
+ diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
+ diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
+ diff.aggregate(null, new DoubleWritable(5));
// 0, because this is totally worthless for diffs
assertEquals(0, diff.getTimesAggregated().get());
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
Thu Jan 23 12:50:13 2014
@@ -27,11 +27,11 @@ public class TestAverageAggregator exten
@Test
public void testAggregator() {
AverageAggregator diff = new AverageAggregator();
- diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+ diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
diff.aggregateInternal();
- diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+ diff.aggregate(new DoubleWritable(5), new DoubleWritable(2));
diff.aggregateInternal();
- diff.aggregate(null, null, new DoubleWritable(5));
+ diff.aggregate(null, new DoubleWritable(5));
diff.aggregateInternal();
assertEquals(3, diff.getTimesAggregated().get());
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
Thu Jan 23 12:50:13 2014
@@ -27,8 +27,8 @@ public class TestMinMaxAggregator extend
@Test
public void testMinAggregator() {
MinAggregator diff = new MinAggregator();
- diff.aggregate(null, new IntWritable(5));
- diff.aggregate(null, new IntWritable(25));
+ diff.aggregate(new IntWritable(5));
+ diff.aggregate(new IntWritable(25));
assertEquals(5, diff.getValue().get());
}
@@ -36,8 +36,8 @@ public class TestMinMaxAggregator extend
@Test
public void testMaxAggregator() {
MaxAggregator diff = new MaxAggregator();
- diff.aggregate(null, new IntWritable(5));
- diff.aggregate(null, new IntWritable(25));
+ diff.aggregate(new IntWritable(5));
+ diff.aggregate(new IntWritable(25));
assertEquals(25, diff.getValue().get());
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
(original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
Thu Jan 23 12:50:13 2014
@@ -27,8 +27,8 @@ public class TestSumAggregator extends T
@Test
public void testAggregator() {
SumAggregator diff = new SumAggregator();
- diff.aggregate(null, new DoubleWritable(5));
- diff.aggregate(null, new DoubleWritable(5));
+ diff.aggregate(new DoubleWritable(5));
+ diff.aggregate(new DoubleWritable(5));
assertEquals(10, (int) diff.getValue().get());
}
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1560672&r1=1560671&r2=1560672&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
(original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Thu Jan 23 12:50:13 2014
@@ -74,7 +74,7 @@ public class PageRank {
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = this.getLastAggregatedValue(0);
+ DoubleWritable globalError = this.getAggregatedValue(0);
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
voteToHalt();