Author: tjungblut
Date: Wed May 23 15:28:16 2012
New Revision: 1341895
URL: http://svn.apache.org/viewvc?rev=1341895&view=rev
Log:
[HAMA-579] Add multiple aggregators
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed May 23 15:28:16 2012
@@ -18,7 +18,8 @@ Release 0.5 - April 10, 2012
BUG FIXES
IMPROVEMENTS
-
+
+ HAMA-579: Add multiple aggregators (tjungblut)
HAMA-576: Improve sendMessages in Vertex (tjungblut)
HAMA-575: Generify graph package (tjungblut)
HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
Modified:
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++
incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Wed May 23 15:28:16 2012
@@ -75,7 +75,7 @@ public class PageRank {
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = getLastAggregatedValue();
+ DoubleWritable globalError = getLastAggregatedValue(0);
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
return;
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
Wed May 23 15:28:16 2012
@@ -24,12 +24,14 @@ import org.apache.hadoop.io.DoubleWritab
* after the compute, then calculates the difference and globally accumulates
* (sums them up) them.
*/
-public class AbsDiffAggregator extends AbstractAggregator<DoubleWritable> {
+public class AbsDiffAggregator extends
+ AbstractAggregator<DoubleWritable, Vertex<?, DoubleWritable, ?>> {
double absoluteDifference = 0.0d;
@Override
- public void aggregate(DoubleWritable oldValue, DoubleWritable newValue) {
+ public void aggregate(Vertex<?, DoubleWritable, ?> v,
+ DoubleWritable oldValue, DoubleWritable newValue) {
// make sure it's nullsafe
if (oldValue != null) {
absoluteDifference += Math.abs(oldValue.get() - newValue.get());
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
Wed May 23 15:28:16 2012
@@ -26,8 +26,8 @@ import org.apache.hadoop.io.Writable;
* For tracking cases it increments an internal counter on each call of
* aggregate.
*/
-public abstract class AbstractAggregator<V extends Writable> implements
- Aggregator<V> {
+public abstract class AbstractAggregator<V extends Writable, VERTEX extends
Vertex<?, V, ?>>
+ implements Aggregator<V, VERTEX> {
private int timesAggregated = 0;
@@ -48,18 +48,21 @@ public abstract class AbstractAggregator
/**
* Observes a value of a vertex after the compute method. This is intended to
* be overriden by the user and is just an empty implementation in this
class.
+ * Please make sure that you are null-checking vertex, since on a master task
+ * this will always be null.
*/
@Override
- public void aggregate(V value) {
+ public void aggregate(VERTEX vertex, V value) {
}
/**
* Observes the old value of a vertex and the new value at the same time.
This
* is intended to be overridden by the user and is just an empty
- * implementation in this class.
+ * implementation in this class.Please make sure that you are null-checking
+ * vertex, since on a master task this will always be null.
*/
- public void aggregate(V oldValue, V newValue) {
+ public void aggregate(VERTEX vertex, V oldValue, V newValue) {
}
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
Wed May 23 15:28:16 2012
@@ -27,12 +27,12 @@ import org.apache.hadoop.io.Writable;
* The result of an aggregator from the last superstep can be picked up by the
* vertex itself via {@link Vertex}#getLastAggregatedValue();
*/
-public interface Aggregator<V extends Writable> {
+public interface Aggregator<V extends Writable, VERTEX extends Vertex<?, ?,
?>> {
/**
* Observes a new vertex value.
*/
- public void aggregate(V value);
+ public void aggregate(VERTEX vertex, V value);
/**
* Gets a vertex value.
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
Wed May 23 15:28:16 2012
@@ -92,9 +92,21 @@ public class GraphJob extends BSPJob {
/**
* Set the aggregator for the job.
*/
- public void setAggregatorClass(@SuppressWarnings("rawtypes")
- Class<? extends Aggregator> cls) {
- conf.setClass(AGGREGATOR_CLASS_ATTR, cls, Aggregator.class);
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void setAggregatorClass(Class<? extends Aggregator> cls) {
+ this.setAggregatorClass(new Class[] { cls });
+ }
+
+ /**
+ * Sets multiple aggregators for the job.
+ */
+ @SuppressWarnings("rawtypes")
+ public void setAggregatorClass(Class<? extends Aggregator>... cls) {
+ String classNames = "";
+ for (Class<? extends Aggregator> cl : cls) {
+ classNames += cl.getName() + ";";
+ }
+ conf.set(AGGREGATOR_CLASS_ATTR, classNames);
}
@SuppressWarnings("unchecked")
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed May 23 15:28:16 2012
@@ -64,10 +64,6 @@ public final class GraphJobRunner<VERTEX
private static final Text FLAG_MESSAGE_COUNTS = new Text(
S_FLAG_MESSAGE_COUNTS);
- private static final Text FLAG_AGGREGATOR_VALUE = new Text(
- S_FLAG_AGGREGATOR_VALUE);
- private static final Text FLAG_AGGREGATOR_INCREMENT = new Text(
- S_FLAG_AGGREGATOR_INCREMENT);
public static final String MESSAGE_COMBINER_CLASS =
"hama.vertex.message.combiner.class";
public static final String GRAPH_REPAIR = "hama.graph.repair";
@@ -75,13 +71,16 @@ public final class GraphJobRunner<VERTEX
private Configuration conf;
private Combiner<VERTEX_VALUE> combiner;
- private Aggregator<VERTEX_VALUE> aggregator;
- private Writable globalAggregatorResult;
- private IntWritable globalAggregatorIncrement;
- private boolean isAbstractAggregator;
-
+ // multiple aggregator arrays
+ private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>>[] aggregators;
+ private Writable[] globalAggregatorResult;
+ private IntWritable[] globalAggregatorIncrement;
+ private boolean[] isAbstractAggregator;
+ private String[] aggregatorClassNames;
+ private Text[] aggregatorValueFlag;
+ private Text[] aggregatorIncrementFlag;
// aggregator on the master side
- private Aggregator<VERTEX_VALUE> masterAggregator;
+ private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>>[] masterAggregator;
private Map<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>
vertices = new HashMap<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>>();
@@ -141,17 +140,31 @@ public final class GraphJobRunner<VERTEX
conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
conf);
}
-
- if (!conf.getClass("hama.graph.aggregator.class", Aggregator.class).equals(
- Aggregator.class)) {
- LOG.debug("Aggregator class: " + conf.get(MESSAGE_COMBINER_CLASS));
-
- aggregator = getNewAggregator();
- if (aggregator instanceof AbstractAggregator) {
- isAbstractAggregator = true;
- }
+ String aggregatorClasses = conf.get(GraphJob.AGGREGATOR_CLASS_ATTR);
+ if (aggregatorClasses != null) {
+ LOG.debug("Aggregator classes: " + aggregatorClasses);
+ aggregatorClassNames = aggregatorClasses.split(";");
+ // init to the split size
+ aggregators = new Aggregator[aggregatorClassNames.length];
+ globalAggregatorResult = new Writable[aggregatorClassNames.length];
+ globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
+ isAbstractAggregator = new boolean[aggregatorClassNames.length];
+ aggregatorValueFlag = new Text[aggregatorClassNames.length];
+ aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
if (isMasterTask(peer)) {
- masterAggregator = getNewAggregator();
+ masterAggregator = new Aggregator[aggregatorClassNames.length];
+ }
+ for (int i = 0; i < aggregatorClassNames.length; i++) {
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ aggregatorValueFlag[i] = new Text(S_FLAG_AGGREGATOR_VALUE + ";" + i);
+ aggregatorIncrementFlag[i] = new Text(S_FLAG_AGGREGATOR_INCREMENT + ";"
+ + i);
+ if (aggregators[i] instanceof AbstractAggregator) {
+ isAbstractAggregator[i] = true;
+ }
+ if (isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ }
}
}
@@ -165,15 +178,19 @@ public final class GraphJobRunner<VERTEX
msgIterator.add(v.getValue());
VERTEX_VALUE lastValue = v.getValue();
v.compute(msgIterator.iterator());
- if (aggregator != null) {
- aggregator.aggregate(v.getValue());
- if (isAbstractAggregator) {
- AbstractAggregator<VERTEX_VALUE> intern =
((AbstractAggregator<VERTEX_VALUE>) aggregator);
- intern.aggregate(lastValue, v.getValue());
- intern.aggregateInternal();
+ if (this.aggregators != null) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>> aggregator = this.aggregators[i];
+ aggregator.aggregate(v, v.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>> intern = (AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>>) aggregator;
+ intern.aggregate(v, lastValue, v.getValue());
+ intern.aggregateInternal();
+ }
}
}
}
+ runAggregators(peer, 1);
iteration++;
}
@@ -200,20 +217,23 @@ public final class GraphJobRunner<VERTEX
updatedCnt.put(FLAG_MESSAGE_COUNTS,
new IntWritable(Integer.MIN_VALUE));
} else {
- if (aggregator != null) {
- Writable lastAggregatedValue = masterAggregator.getValue();
- if (isAbstractAggregator) {
- final AbstractAggregator<VERTEX_VALUE> intern =
((AbstractAggregator<VERTEX_VALUE>) aggregator);
- final Writable finalizeAggregation =
intern.finalizeAggregation();
- if (intern.finalizeAggregation() != null) {
- lastAggregatedValue = finalizeAggregation;
+ if (aggregators != null) {
+ for (int i = 0; i < masterAggregator.length; i++) {
+ Writable lastAggregatedValue = masterAggregator[i].getValue();
+ if (isAbstractAggregator[i]) {
+ final AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>> intern = ((AbstractAggregator<VERTEX_VALUE,
Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>) aggregators[i]);
+ final Writable finalizeAggregation = intern
+ .finalizeAggregation();
+ if (intern.finalizeAggregation() != null) {
+ lastAggregatedValue = finalizeAggregation;
+ }
+ // this count is usually the times of active
+ // vertices in the graph
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ intern.getTimesAggregated());
}
- // this count is usually the times of active
- // vertices in the graph
- updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
- intern.getTimesAggregated());
+ updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
}
- updatedCnt.put(FLAG_AGGREGATOR_VALUE, lastAggregatedValue);
}
}
for (String peerName : peer.getAllPeerNames()) {
@@ -222,17 +242,14 @@ public final class GraphJobRunner<VERTEX
}
// if we have an aggregator defined, we must make an additional sync
// to have the updated values available on all our peers.
- if (aggregator != null && iteration > 1) {
+ if (aggregators != null && iteration > 1) {
peer.sync();
MapWritable updatedValues = peer.getCurrentMessage().getMap();
- globalAggregatorResult = updatedValues.get(FLAG_AGGREGATOR_VALUE);
- globalAggregatorIncrement = (IntWritable) updatedValues
- .get(FLAG_AGGREGATOR_INCREMENT);
-
- aggregator = getNewAggregator();
- if (isMasterTask(peer)) {
- masterAggregator = getNewAggregator();
+ for (int i = 0; i < aggregators.length; i++) {
+ globalAggregatorResult[i] =
updatedValues.get(aggregatorValueFlag[i]);
+ globalAggregatorIncrement[i] = (IntWritable) updatedValues
+ .get(aggregatorIncrementFlag[i]);
}
IntWritable count = (IntWritable) updatedValues
.get(FLAG_MESSAGE_COUNTS);
@@ -257,32 +274,54 @@ public final class GraphJobRunner<VERTEX
.get(e.getKey());
VERTEX_VALUE lastValue = vertex.getValue();
vertex.compute(msgs.iterator());
- if (aggregator != null) {
- aggregator.aggregate(vertex.getValue());
- if (isAbstractAggregator) {
- AbstractAggregator<VERTEX_VALUE> intern =
((AbstractAggregator<VERTEX_VALUE>) aggregator);
- intern.aggregate(lastValue, vertex.getValue());
- intern.aggregateInternal();
+ if (aggregators != null) {
+ if (this.aggregators != null) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>> aggregator = this.aggregators[i];
+ aggregator.aggregate(vertex, vertex.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>> intern = ((AbstractAggregator<VERTEX_VALUE,
Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>) aggregator);
+ intern.aggregate(vertex, lastValue, vertex.getValue());
+ intern.aggregateInternal();
+ }
+ }
}
}
iterator.remove();
}
- // send msgCounts to the master task
- MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(messagesSize));
- // also send aggregated values to the master
- if (aggregator != null) {
- updatedCnt.put(FLAG_AGGREGATOR_VALUE, aggregator.getValue());
- if (isAbstractAggregator) {
- updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
- ((AbstractAggregator<VERTEX_VALUE>) aggregator)
- .getTimesAggregated());
+ runAggregators(peer, messagesSize);
+ iteration++;
+ }
+ }
+
+ private void runAggregators(
+ BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable,
Writable, Writable, GraphJobMessage> peer,
+ int messagesSize) throws IOException {
+ // send msgCounts to the master task
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(messagesSize));
+ // also send aggregated values to the master
+ if (aggregators != null) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+ if (isAbstractAggregator[i]) {
+ updatedCnt
+ .put(
+ aggregatorIncrementFlag[i],
+ ((AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>>) aggregators[i])
+ .getTimesAggregated());
+ }
+ }
+ for (int i = 0; i < aggregators.length; i++) {
+ // now create new aggregators for the next iteration
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ if (isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
}
}
- peer.send(masterTask, new GraphJobMessage(updatedCnt));
- iteration++;
}
+ peer.send(masterTask, new GraphJobMessage(updatedCnt));
}
@SuppressWarnings("unchecked")
@@ -305,20 +344,23 @@ public final class GraphJobRunner<VERTEX
msgs.add(value);
} else if (msg.isMapMessage()) {
for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
- VERTEX_ID vertexID = (VERTEX_ID) e.getKey();
+ Text vertexID = (Text) e.getKey();
if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
updated = false;
} else {
globalUpdateCounts += ((IntWritable) e.getValue()).get();
}
- } else if (aggregator != null
- && FLAG_AGGREGATOR_VALUE.equals(vertexID)) {
- masterAggregator.aggregate((VERTEX_VALUE) e.getValue());
- } else if (aggregator != null
- && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) {
- if (isAbstractAggregator) {
- ((AbstractAggregator<VERTEX_VALUE>) masterAggregator)
+ } else if (aggregators != null
+ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+ int index = Integer.parseInt(vertexID.toString().split(";")[1]);
+ masterAggregator[index]
+ .aggregate(null, (VERTEX_VALUE) e.getValue());
+ } else if (aggregators != null
+ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+ int index = Integer.parseInt(vertexID.toString().split(";")[1]);
+ if (isAbstractAggregator[index]) {
+ ((AbstractAggregator<VERTEX_VALUE, Vertex<VERTEX_ID,
VERTEX_VALUE, EDGE_VALUE_TYPE>>) masterAggregator[index])
.addTimesAggregated(((IntWritable) e.getValue()).get());
}
}
@@ -462,9 +504,16 @@ public final class GraphJobRunner<VERTEX
}
@SuppressWarnings("unchecked")
- private Aggregator<VERTEX_VALUE> getNewAggregator() {
- return (Aggregator<VERTEX_VALUE>) ReflectionUtils.newInstance(
- conf.getClass("hama.graph.aggregator.class", Aggregator.class), conf);
+ private Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>> getNewAggregator(
+ String clsName) {
+ try {
+ return (Aggregator<VERTEX_VALUE, Vertex<VERTEX_ID, VERTEX_VALUE,
EDGE_VALUE_TYPE>>) ReflectionUtils
+ .newInstance(conf.getClassByName(clsName), conf);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ throw new IllegalArgumentException("Aggregator class " + clsName
+ + " could not be found or instantiated!");
}
private boolean isMasterTask(
@@ -484,12 +533,12 @@ public final class GraphJobRunner<VERTEX
return maxIteration;
}
- public final Writable getLastAggregatedValue() {
- return globalAggregatorResult;
+ public final Writable getLastAggregatedValue(int index) {
+ return globalAggregatorResult[index];
}
- public final IntWritable getNumLastAggregatedVertices() {
- return globalAggregatorIncrement;
+ public final IntWritable getNumLastAggregatedVertices(int index) {
+ return globalAggregatorIncrement[index];
}
}
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
Wed May 23 15:28:16 2012
@@ -19,12 +19,13 @@ package org.apache.hama.graph;
import org.apache.hadoop.io.IntWritable;
-public class MaxAggregator extends AbstractAggregator<IntWritable> {
+public class MaxAggregator extends
+ AbstractAggregator<IntWritable, Vertex<?, IntWritable, ?>> {
int max = Integer.MIN_VALUE;
@Override
- public void aggregate(IntWritable value) {
+ public void aggregate(Vertex<?, IntWritable, ?> vertex, IntWritable value) {
if (value.get() > max) {
max = value.get();
}
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
(original)
+++
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
Wed May 23 15:28:16 2012
@@ -19,12 +19,13 @@ package org.apache.hama.graph;
import org.apache.hadoop.io.IntWritable;
-public class MinAggregator extends AbstractAggregator<IntWritable> {
+public class MinAggregator extends
+ AbstractAggregator<IntWritable, Vertex<?, IntWritable, ?>> {
int min = Integer.MAX_VALUE;
@Override
- public void aggregate(IntWritable value) {
+ public void aggregate(Vertex<?, IntWritable, ?> vertex, IntWritable value) {
if (value.get() < min) {
min = value.get();
}
Modified:
incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
(original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Wed May 23 15:28:16 2012
@@ -92,19 +92,27 @@ public abstract class Vertex<ID_TYPE ext
/**
* Get the last aggregated value of the defined aggregator, null if nothing
- * was configured or not returned a result.
+ * was configured or not returned a result. You have to supply an index, the
+ * index is defined by the order you set the aggregator classes in
+ * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+ * so if you have a single aggregator you can retrieve it via
+ * {@link #getLastAggregatedValue}(0).
*/
@SuppressWarnings("unchecked")
- public MSG_TYPE getLastAggregatedValue() {
- return (MSG_TYPE) runner.getLastAggregatedValue();
+ public MSG_TYPE getLastAggregatedValue(int index) {
+ return (MSG_TYPE) runner.getLastAggregatedValue(index);
}
/**
* Get the number of aggregated vertices in the last superstep. Or null if no
- * aggregator is available.
+ * aggregator is available.You have to supply an index, the index is defined
+ * by the order you set the aggregator classes in
+ * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+ * so if you have a single aggregator you can retrieve it via
+ * {@link #getNumLastAggregatedVertices}(0).
*/
- public IntWritable getNumLastAggregatedVertices() {
- return runner.getNumLastAggregatedVertices();
+ public IntWritable getNumLastAggregatedVertices(int index) {
+ return runner.getNumLastAggregatedVertices(index);
}
public int getNumPeers() {
Modified:
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Wed May 23 15:28:16 2012
@@ -75,6 +75,7 @@ public class TestSubmitGraphJob extends
private static String INPUT = "/tmp/pagerank-real-tmp.seq";
private static String OUTPUT = "/tmp/pagerank-real-out";
+ @SuppressWarnings("unchecked")
@Override
public void testSubmitJob() throws Exception {
@@ -97,7 +98,7 @@ public class TestSubmitGraphJob extends
// we need to include a vertex in its adjacency list,
// otherwise the pagerank result has a constant loss
bsp.set("hama.graph.self.ref", "true");
- bsp.setAggregatorClass(AverageAggregator.class);
+ bsp.setAggregatorClass(AverageAggregator.class, SumAggregator.class);
bsp.setVertexIDClass(Text.class);
bsp.setVertexValueClass(DoubleWritable.class);
Modified:
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1341895&r1=1341894&r2=1341895&view=diff
==============================================================================
---
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
(original)
+++
incubator/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
Wed May 23 15:28:16 2012
@@ -64,10 +64,15 @@ public class PageRank {
}
double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+ if (this.getSuperstepCount() > 1) {
+ if(this.getLastAggregatedValue(1).get() < 0.99 ||
this.getLastAggregatedValue(1).get() > 1.0){
+ throw new RuntimeException("Sum aggregator hasn't summed
correctly! " + this.getLastAggregatedValue(1).get());
+ }
+ }
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = getLastAggregatedValue();
+ DoubleWritable globalError = getLastAggregatedValue(0);
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
return;