Author: edwardyoon
Date: Thu Jan 23 08:16:44 2014
New Revision: 1560605
URL: http://svn.apache.org/r1560605
Log:
Revert r1556691
Removed:
hama/trunk/examples/src/test/java/org/apache/hama/examples/AggregatorsTest.java
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
Thu Jan 23 08:16:44 2014
@@ -40,7 +40,6 @@ import org.apache.hama.graph.VertexInput
* Real pagerank with dangling node contribution.
*/
public class PageRank {
- private static final String AVG_AGGREGATOR = "average.aggregator";
public static class PageRankVertex extends
Vertex<Text, NullWritable, DoubleWritable> {
@@ -72,11 +71,10 @@ public class PageRank {
}
double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
- aggregate(AVG_AGGREGATOR, this.getValue());
}
// if we have not reached our global error yet, then proceed.
- DoubleWritable globalError = (DoubleWritable)
getAggregatedValue(AVG_AGGREGATOR);
+ DoubleWritable globalError = getLastAggregatedValue(0);
if (globalError != null && this.getSuperstepCount() > 2
&& MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
@@ -127,8 +125,8 @@ public class PageRank {
}
// error
- pageJob.registerAggregator(AVG_AGGREGATOR, AverageAggregator.class);
-
+ pageJob.setAggregatorClass(AverageAggregator.class);
+
// Vertex reader
pageJob.setVertexInputReaderClass(PagerankSeqReader.class);
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Thu Jan 23 08:16:44 2014
@@ -17,20 +17,22 @@
*/
package org.apache.hama.graph;
-import java.util.HashMap;
+import java.io.IOException;
import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+import com.google.common.base.Preconditions;
/**
* Runner class to do the tasks that need to be done if aggregation was
@@ -39,31 +41,117 @@ import org.apache.hama.bsp.BSPPeer;
*/
@SuppressWarnings("rawtypes")
public final class AggregationRunner<V extends WritableComparable, E extends
Writable, M extends Writable> {
- private Map<String, Aggregator> Aggregators;
- private Map<String, Writable> aggregatorResults;
- private Set<String> aggregatorsUsed;
+ // multiple aggregator arrays
+ private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+ private Set<Integer> skipAggregators;
+ private Writable[] globalAggregatorResult;
+ private IntWritable[] globalAggregatorIncrement;
+ private boolean[] isAbstractAggregator;
+ private String[] aggregatorClassNames;
+ private Text[] aggregatorValueFlag;
+ private Text[] aggregatorIncrementFlag;
+ // aggregator on the master side
+ private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+
+ private boolean enabled = false;
private Configuration conf;
- private Text textWrap = new Text();
+ @SuppressWarnings("unchecked")
public void setupAggregators(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
this.conf = peer.getConfiguration();
-
- this.aggregatorResults = new HashMap<String, Writable>(4);
- this.Aggregators = new HashMap<String, Aggregator>(4);
- this.aggregatorsUsed = new HashSet<String>(4);
-
- String customAggregatorClasses = peer.getConfiguration().get(
+ String aggregatorClasses = peer.getConfiguration().get(
GraphJob.AGGREGATOR_CLASS_ATTR);
+ this.skipAggregators = new HashSet<Integer>();
+ if (aggregatorClasses != null) {
+ enabled = true;
+ aggregatorClassNames = aggregatorClasses.split(";");
+ // init to the split size
+ aggregators = new Aggregator[aggregatorClassNames.length];
+ globalAggregatorResult = new Writable[aggregatorClassNames.length];
+ globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
+ isAbstractAggregator = new boolean[aggregatorClassNames.length];
+ aggregatorValueFlag = new Text[aggregatorClassNames.length];
+ aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator = new Aggregator[aggregatorClassNames.length];
+ }
+ for (int i = 0; i < aggregatorClassNames.length; i++) {
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ aggregatorValueFlag[i] = new Text(
+ GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
+ aggregatorIncrementFlag[i] = new Text(
+ GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
+ if (aggregators[i] instanceof AbstractAggregator) {
+ isAbstractAggregator[i] = true;
+ }
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ }
+ }
+ }
+ }
- if (customAggregatorClasses != null) {
- String[] custAggrs = customAggregatorClasses.split(";");
+ /**
+ * Runs the aggregators by sending their values to the master task.
+ * @param changedVertexCnt
+ */
+ public void sendAggregatorValues(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ int activeVertices, int changedVertexCnt) throws IOException {
+ // send msgCounts to the master task
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
+ activeVertices));
+ // send total number of vertices changes
+ updatedCnt.put(GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, new LongWritable(
+ changedVertexCnt));
+ // also send aggregated values to the master
+ if (aggregators != null) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ if (!this.skipAggregators.contains(i)) {
+ updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+ if (isAbstractAggregator[i]) {
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+ .getTimesAggregated());
+ }
+ }
+ }
+ for (int i = 0; i < aggregators.length; i++) {
+ if (!this.skipAggregators.contains(i)) {
+ // now create new aggregators for the next iteration
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ }
+ }
+ }
+ }
+ peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
+ updatedCnt));
+ }
- for (String aggr : custAggrs) {
- String[] Name_AggrClass = aggr.split("@", 2);
- this.Aggregators.put(Name_AggrClass[0],
- getNewAggregator(Name_AggrClass[1]));
+ /**
+ * Aggregates the last value before computation and the value after the
+ * computation.
+ *
+ * @param lastValue the value before compute().
+ * @param v the vertex.
+ */
+ public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
+ if (isEnabled()) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ if (!this.skipAggregators.contains(i)) {
+ Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+ aggregator.aggregate(v, v.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
+ intern.aggregate(v, lastValue, v.getValue());
+ intern.aggregateInternal();
+ }
+ }
}
}
}
@@ -73,20 +161,26 @@ public final class AggregationRunner<V e
* peer and updates the given map accordingly.
*/
public void doMasterAggregation(MapWritable updatedCnt) {
- // Get results only from used aggregators.
- for (String name : this.aggregatorsUsed) {
- updatedCnt.put(new Text(name), this.Aggregators.get(name).getValue());
- }
- this.aggregatorsUsed.clear();
-
- // Reset all custom aggregators. TODO: Change the aggregation interface to
- // include clean() method.
- Map<String, Aggregator> tmp = new HashMap<String, Aggregator>(4);
- for (Entry<String, Aggregator> e : this.Aggregators.entrySet()) {
- String aggClass = e.getValue().getClass().getName();
- tmp.put(e.getKey(), getNewAggregator(aggClass));
+ if (isEnabled()) {
+ // work through the master aggregators
+ for (int i = 0; i < masterAggregator.length; i++) {
+ if (!this.skipAggregators.contains(i)) {
+ Writable lastAggregatedValue = masterAggregator[i].getValue();
+ if (isAbstractAggregator[i]) {
+ final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
+ final Writable finalizeAggregation = intern.finalizeAggregation();
+ if (intern.finalizeAggregation() != null) {
+ lastAggregatedValue = finalizeAggregation;
+ }
+ // this count is usually the times of active
+ // vertices in the graph
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ intern.getTimesAggregated());
+ }
+ updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
+ }
+ }
}
- this.Aggregators = tmp;
}
/**
@@ -96,20 +190,13 @@ public final class AggregationRunner<V e
* we haven't seen any messages anymore.
*/
public boolean receiveAggregatedValues(MapWritable updatedValues,
- long iteration) {
- // In every superstep, we create a new result collection as we don't save
- // history.
- // If a value is missing, the user will take a null result. By creating a
- // new collection
- // every time, we can reduce the network cost (because we send less
- // information by skipping null values)
- // But we are losing in GC.
- this.aggregatorResults = new HashMap<String, Writable>(4);
- for (String name : this.Aggregators.keySet()) {
- this.textWrap.set(name);
- this.aggregatorResults.put(name, updatedValues.get(textWrap));
+ long iteration) throws IOException, SyncException, InterruptedException {
+ // map is the first value that is in the queue
+ for (int i = 0; i < aggregators.length; i++) {
+ globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+ globalAggregatorIncrement[i] = (IntWritable) updatedValues
+ .get(aggregatorIncrementFlag[i]);
}
-
IntWritable count = (IntWritable) updatedValues
.get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
if (count != null && count.get() == Integer.MIN_VALUE) {
@@ -119,16 +206,47 @@ public final class AggregationRunner<V e
}
/**
- * Method to let the custom master aggregator read messages from peers and
- * aggregate a value.
+ * @return true if aggregators were defined. Normally used by the internal
+ * stateful methods, outside shouldn't use it too extensively.
*/
- @SuppressWarnings("unchecked")
- public void masterAggregation(Text name, Writable value) {
- String nameIdx = name.toString().split(";", 2)[1];
- this.Aggregators.get(nameIdx).aggregate(null, value);
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ /**
+ * Method to let the master read messages from peers and aggregate a value.
+ */
+ public void masterReadAggregatedValue(Text textIndex, M value) {
+ int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+ masterAggregator[index].aggregate(null, value);
+ }
- // When it's time to send the values, we can see which aggregators are
used.
- this.aggregatorsUsed.add(nameIdx);
+ /**
+ * Method to let the master read messages from peers and aggregate the
+ * incremental value.
+ */
+ public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
+ int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+ if (isAbstractAggregator[index]) {
+ ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+ .addTimesAggregated(((IntWritable) value).get());
+ }
+ }
+
+ /**
+ * This method adds an id of an aggregator that will be skipped in the
current
+ * superstep.
+ */
+ public void addSkipAggregator(int index) {
+ this.skipAggregators.add(index);
+ }
+
+ /**
+ * This method adds an id of an aggregator that will be skipped in the
current
+ * superstep.
+ */
+ void resetSkipAggregators() {
+ this.skipAggregators.clear();
}
@SuppressWarnings("unchecked")
@@ -143,7 +261,13 @@ public final class AggregationRunner<V e
+ " could not be found or instantiated!");
}
- public final Writable getAggregatedValue(String name) {
- return this.aggregatorResults.get(name);
+ public final Writable getLastAggregatedValue(int index) {
+ return globalAggregatorResult[Preconditions.checkPositionIndex(index,
+ globalAggregatorResult.length)];
+ }
+
+ public final IntWritable getNumLastAggregatedVertices(int index) {
+ return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
+ globalAggregatorIncrement.length)];
}
-}
+}
\ No newline at end of file
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Thu Jan
23 08:16:44 2014
@@ -107,20 +107,23 @@ public class GraphJob extends BSPJob {
}
/**
- * Custom aggregator registration. Add a custom aggregator that will
aggregate
- * massages sent from the user.
- *
- * @param name identifies an aggregator
- * @param aggregatorClass the aggregator class
+ * Set the aggregator for the job.
*/
- @SuppressWarnings("rawtypes")
- public void registerAggregator(String name,
- Class<? extends Aggregator> aggregatorClass) {
- String prevAggrs = this.conf.get(AGGREGATOR_CLASS_ATTR, "");
-
- prevAggrs += name + "@" + aggregatorClass.getName() + ";";
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void setAggregatorClass(Class<? extends Aggregator> cls) {
+ this.setAggregatorClass(new Class[] { cls });
+ }
- this.conf.set(AGGREGATOR_CLASS_ATTR, prevAggrs);
+ /**
+ * Sets multiple aggregators for the job.
+ */
+ @SuppressWarnings("rawtypes")
+ public void setAggregatorClass(Class<? extends Aggregator>... cls) {
+ String classNames = "";
+ for (Class<? extends Aggregator> cl : cls) {
+ classNames += cl.getName() + ";";
+ }
+ conf.set(AGGREGATOR_CLASS_ATTR, classNames);
}
/**
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Jan 23 08:16:44 2014
@@ -63,11 +63,12 @@ public final class GraphJobRunner<V exte
// make sure that these values don't collide with the vertex names
public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
- public static final String S_FLAG_VERTEX_INCREASE = "hama.2";
- public static final String S_FLAG_VERTEX_DECREASE = "hama.3";
- public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.4";
- public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.5";
-
+ public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+ public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
+ public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
+ public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
+ public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
+ public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
public static final Text FLAG_VERTEX_INCREASE = new Text(
S_FLAG_VERTEX_INCREASE);
@@ -178,10 +179,22 @@ public final class GraphJobRunner<V exte
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
- // This run only on master
- if (isMasterTask(peer)) {
+ if (isMasterTask(peer) && iteration == 1) {
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(
+ FLAG_VERTEX_TOTAL_VERTICES,
+ new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
+ .getCounter())));
+ // send the updates from the master tasks back to the slaves
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new GraphJobMessage(updatedCnt));
+ }
+ }
+
+ // this is only done in every second iteration
+ if (isMasterTask(peer) && iteration > 1) {
MapWritable updatedCnt = new MapWritable();
- // send total number of vertices
+ // send total number of vertices.
updatedCnt.put(
FLAG_VERTEX_TOTAL_VERTICES,
new LongWritable((peer.getCounter(GraphJobCounter.INPUT_VERTICES)
@@ -197,18 +210,26 @@ public final class GraphJobRunner<V exte
peer.send(peerName, new GraphJobMessage(updatedCnt));
}
}
-
- if (firstVertexMessage != null) {
- peer.send(peer.getPeerName(), firstVertexMessage);
+ if (getAggregationRunner().isEnabled() && iteration > 1) {
+ // in case we need to sync, we need to replay the messages that already
+ // are added to the queue. This prevents loosing messages when using
+ // aggregators.
+ if (firstVertexMessage != null) {
+ peer.send(peer.getPeerName(), firstVertexMessage);
+ }
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ peer.send(peer.getPeerName(), msg);
+ }
+ // now sync
+ peer.sync();
+ // now the map message must be read that might be send from the master
+ updated = getAggregationRunner().receiveAggregatedValues(
+ peer.getCurrentMessage().getMap(), iteration);
+ // set the first vertex message back to the message it had before sync
+ firstVertexMessage = peer.getCurrentMessage();
}
-
- // now sync
- peer.sync();
- // now the map message must be read that might be send from the master
- updated = getAggregationRunner().receiveAggregatedValues(
- peer.getCurrentMessage().getMap(), iteration);
- // set the first vertex message back to the message it had before sync
- firstVertexMessage = peer.getCurrentMessage();
+ this.aggregationRunner.resetSkipAggregators();
return firstVertexMessage;
}
@@ -250,12 +271,14 @@ public final class GraphJobRunner<V exte
}
if (!vertex.isHalted()) {
+ M lastValue = vertex.getValue();
if (iterable == null) {
vertex.compute(Collections.<M> emptyList());
} else {
vertex.compute(iterable);
currentMessage = iterable.getOverflowMessage();
}
+ getAggregationRunner().aggregateVertex(lastValue, vertex);
activeVertices++;
}
@@ -265,7 +288,8 @@ public final class GraphJobRunner<V exte
}
vertices.finishSuperstep();
- sendControllValues(activeVertices, this.changedVertexCnt);
+ getAggregationRunner().sendAggregatorValues(peer, activeVertices,
+ this.changedVertexCnt);
iteration++;
}
@@ -316,6 +340,7 @@ public final class GraphJobRunner<V exte
* Seed the vertices first with their own values in compute. This is the
first
* superstep after the vertices have been loaded.
*/
+ @SuppressWarnings("unused")
private void doInitialSuperstep(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
@@ -324,6 +349,7 @@ public final class GraphJobRunner<V exte
IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
while (skippingIterator.hasNext()) {
Vertex<V, E, M> vertex = skippingIterator.next();
+ M lastValue = vertex.getValue();
// Calls setup method.
vertex.setup(conf);
@@ -331,7 +357,7 @@ public final class GraphJobRunner<V exte
vertices.finishVertexComputation(vertex);
}
vertices.finishSuperstep();
- sendControllValues(1, this.changedVertexCnt);
+ getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
}
@@ -551,10 +577,14 @@ public final class GraphJobRunner<V exte
} else {
globalUpdateCounts += ((IntWritable) e.getValue()).get();
}
-
- } else if (vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
- this.getAggregationRunner().masterAggregation(vertexID,
- e.getValue());
+ } else if (getAggregationRunner().isEnabled()
+ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+ getAggregationRunner().masterReadAggregatedValue(vertexID,
+ (M) e.getValue());
+ } else if (getAggregationRunner().isEnabled()
+ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+ getAggregationRunner().masterReadAggregatedIncrementalValue(
+ vertexID, (M) e.getValue());
} else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
dynamicAdditions = true;
addVertex((Vertex<V, E, M>) e.getValue());
@@ -574,9 +604,11 @@ public final class GraphJobRunner<V exte
}
}
}
+
} else {
throw new UnsupportedOperationException("Unknown message type: " +
msg);
}
+
}
// If we applied any changes to vertices, we need to call finishAdditions
@@ -620,20 +652,23 @@ public final class GraphJobRunner<V exte
}
/**
- * Runs internal aggregators and send their values to the master task.
+ * Gets the last aggregated value at the given index. The index is dependend
+ * on how the aggregators were configured during job setup phase.
*
- * @param activeVertices number of active vertices in this peer
- * @param changedVertexCnt number of added/removed vertices in a superstep
+ * @return the value of the aggregator, or null if none was defined.
*/
- private void sendControllValues(int activeVertices, int changedVertexCnt)
- throws IOException {
- // send msgCounts to the master task
- MapWritable updatedCnt = new MapWritable();
- updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
- // send total number of vertices changes
- updatedCnt.put(FLAG_VERTEX_ALTER_COUNTER,
- new LongWritable(changedVertexCnt));
- peer.send(getMasterTask(peer), new GraphJobMessage(updatedCnt));
+ public final Writable getLastAggregatedValue(int index) {
+ return getAggregationRunner().getLastAggregatedValue(index);
+ }
+
+ /**
+ * Gets the last aggregated number of vertices at the given index. The index
+ * is dependend on how the aggregators were configured during job setup
phase.
+ *
+ * @return the value of the aggregator, or null if none was defined.
+ */
+ public final IntWritable getNumLastAggregatedVertices(int index) {
+ return getAggregationRunner().getNumLastAggregatedVertices(index);
}
/**
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Jan 23
08:16:44 2014
@@ -27,8 +27,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
@@ -194,6 +194,31 @@ public abstract class Vertex<V extends W
return runner.getMaxIteration();
}
+ /**
+ * Get the last aggregated value of the defined aggregator, null if nothing
+ * was configured or not returned a result. You have to supply an index, the
+ * index is defined by the order you set the aggregator classes in
+ * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+ * so if you have a single aggregator you can retrieve it via
+ * {@link #getLastAggregatedValue}(0).
+ */
+ @SuppressWarnings("unchecked")
+ public M getLastAggregatedValue(int index) {
+ return (M) runner.getLastAggregatedValue(index);
+ }
+
+ /**
+ * Get the number of aggregated vertices in the last superstep. Or null if no
+ * aggregator is available.You have to supply an index, the index is defined
+ * by the order you set the aggregator classes in
+ * {@link GraphJob#setAggregatorClass(Class...)}. Index is starting at zero,
+ * so if you have a single aggregator you can retrieve it via
+ * {@link #getNumLastAggregatedVertices}(0).
+ */
+ public IntWritable getNumLastAggregatedVertices(int index) {
+ return runner.getNumLastAggregatedVertices(index);
+ }
+
public int getNumPeers() {
return runner.getPeer().getNumPeers();
}
@@ -361,30 +386,6 @@ public abstract class Vertex<V extends W
}
- /**
- * Provides a value to the specified aggregator.
- *
- * @throws IOException
- *
- * @param name identifies an aggregator
- * @param value value to be aggregated
- */
- @Override
- public void aggregate(String name, M value) throws IOException {
- MapWritable msg = new MapWritable();
- msg.put(new Text(GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + name),
- value);
-
- // Get master task peer.
- String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
- runner.getPeer().send(destPeer, new GraphJobMessage(msg));
- }
-
- @Override
- public Writable getAggregatedValue(String name) {
- return this.runner.getAggregationRunner().getAggregatedValue(name);
- }
-
protected void setRunner(GraphJobRunner<V, E, M> runner) {
this.runner = runner;
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1560605&r1=1560604&r2=1560605&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
Thu Jan 23 08:16:44 2014
@@ -112,19 +112,4 @@ public interface VertexInterface<V exten
*/
public M getValue();
- /**
- * Provides a value to the specified aggregator.
- *
- * @throws IOException
- *
- * @param name identifies a aggregator
- * @param value value to be aggregated
- */
- public void aggregate(String name, M value) throws IOException;
-
- /**
- * Returns the value of the specified aggregator.
- */
- public Writable getAggregatedValue(String name);
-
}