Author: andronat
Date: Fri Oct 4 14:29:15 2013
New Revision: 1529170
URL: http://svn.apache.org/r1529170
Log:
HAMA-807: Make aggregators skip supersteps
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Oct 4 14:29:15 2013
@@ -4,16 +4,17 @@ Release 0.6.3 (unreleased changes)
NEW FEATURES
+ HAMA-807: Make aggregators skip supersteps (Anastasis Andronidis)
HAMA-800: Hama Pipes Examples (Martin Illecker)
HAMA-804: Create NeuralNetwork Example (Yexi Jiang)
HAMA-795: Implement Autoencoder based on NeuralNetwork (Yexi Jiang)
HAMA-767: Add vertex addition/removal APIs (Anastasis Andronidis via
edwardyoon)
HAMA-594: Semi-Clustering Algorithm Implementation (Renil Jeseph via
edwardyoon)
- HAMA-801: Snappy fails on Mac OS with JDK 1.7 (Anastasis Andronidis via
tommaso)
BUG FIXES
HAMA-805: Problem initializing pipes in HamaStreaming (Martin Illecker)
+ HAMA-801: Snappy fails on Mac OS with JDK 1.7 (Anastasis Andronidis via
tommaso)
HAMA-789: BspPeer launched fail because port is bound by others (Suraj
Menon via edwardyoon)
HAMA-791: Fix the problem that MultilayerPerceptron fails to learn a good
hypothesis sometimes. (Yexi Jiang)
HAMA-782: The arguments of DoubleVector.slice(int, int) method will mislead
the user. (Yexi Jiang)
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Fri Oct 4 14:29:15 2013
@@ -18,6 +18,8 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
@@ -42,6 +44,7 @@ public final class AggregationRunner<V e
// multiple aggregator arrays
private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+ private Set<Integer> skipAggregators;
private Writable[] globalAggregatorResult;
private IntWritable[] globalAggregatorIncrement;
private boolean[] isAbstractAggregator;
@@ -60,6 +63,7 @@ public final class AggregationRunner<V e
this.conf = peer.getConfiguration();
String aggregatorClasses = peer.getConfiguration().get(
GraphJob.AGGREGATOR_CLASS_ATTR);
+ this.skipAggregators = new HashSet<Integer>();
if (aggregatorClasses != null) {
enabled = true;
aggregatorClassNames = aggregatorClasses.split(";");
@@ -106,18 +110,22 @@ public final class AggregationRunner<V e
// also send aggregated values to the master
if (aggregators != null) {
for (int i = 0; i < this.aggregators.length; i++) {
- updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
- if (isAbstractAggregator[i]) {
- updatedCnt.put(aggregatorIncrementFlag[i],
- ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
- .getTimesAggregated());
+ if (!this.skipAggregators.contains(i)) {
+ updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+ if (isAbstractAggregator[i]) {
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+ .getTimesAggregated());
+ }
}
}
for (int i = 0; i < aggregators.length; i++) {
- // now create new aggregators for the next iteration
- aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
- if (GraphJobRunner.isMasterTask(peer)) {
- masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ if (!this.skipAggregators.contains(i)) {
+ // now create new aggregators for the next iteration
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ }
}
}
}
@@ -135,12 +143,14 @@ public final class AggregationRunner<V e
public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
if (isEnabled()) {
for (int i = 0; i < this.aggregators.length; i++) {
- Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
- aggregator.aggregate(v, v.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
- intern.aggregate(v, lastValue, v.getValue());
- intern.aggregateInternal();
+ if (!this.skipAggregators.contains(i)) {
+ Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+ aggregator.aggregate(v, v.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
+ intern.aggregate(v, lastValue, v.getValue());
+ intern.aggregateInternal();
+ }
}
}
}
@@ -154,19 +164,21 @@ public final class AggregationRunner<V e
if (isEnabled()) {
// work through the master aggregators
for (int i = 0; i < masterAggregator.length; i++) {
- Writable lastAggregatedValue = masterAggregator[i].getValue();
- if (isAbstractAggregator[i]) {
- final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
- final Writable finalizeAggregation = intern.finalizeAggregation();
- if (intern.finalizeAggregation() != null) {
- lastAggregatedValue = finalizeAggregation;
+ if (!this.skipAggregators.contains(i)) {
+ Writable lastAggregatedValue = masterAggregator[i].getValue();
+ if (isAbstractAggregator[i]) {
+ final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
+ final Writable finalizeAggregation = intern.finalizeAggregation();
+ if (intern.finalizeAggregation() != null) {
+ lastAggregatedValue = finalizeAggregation;
+ }
+ // this count is usually the times of active
+ // vertices in the graph
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ intern.getTimesAggregated());
}
- // this count is usually the times of active
- // vertices in the graph
- updatedCnt.put(aggregatorIncrementFlag[i],
- intern.getTimesAggregated());
+ updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
}
- updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
}
}
}
@@ -181,9 +193,9 @@ public final class AggregationRunner<V e
long iteration) throws IOException, SyncException, InterruptedException {
// map is the first value that is in the queue
for (int i = 0; i < aggregators.length; i++) {
- globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
- globalAggregatorIncrement[i] = (IntWritable) updatedValues
- .get(aggregatorIncrementFlag[i]);
+ globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+ globalAggregatorIncrement[i] = (IntWritable) updatedValues
+ .get(aggregatorIncrementFlag[i]);
}
IntWritable count = (IntWritable) updatedValues
.get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
@@ -221,6 +233,22 @@ public final class AggregationRunner<V e
}
}
+ /**
+ * This method adds an id of an aggregator that will be skipped in the
current
+ * superstep.
+ */
+ public void addSkipAggregator(int index) {
+ this.skipAggregators.add(index);
+ }
+
+ /**
+ * This method adds an id of an aggregator that will be skipped in the
current
+ * superstep.
+ */
+ void resetSkipAggregators() {
+ this.skipAggregators.clear();
+ }
+
@SuppressWarnings("unchecked")
private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
try {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Fri Oct 4 14:29:15 2013
@@ -68,11 +68,13 @@ public final class GraphJobRunner<V exte
public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
+ public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
public static final Text FLAG_VERTEX_INCREASE = new
Text(S_FLAG_VERTEX_INCREASE);
public static final Text FLAG_VERTEX_DECREASE = new
Text(S_FLAG_VERTEX_DECREASE);
public static final Text FLAG_VERTEX_ALTER_COUNTER = new
Text(S_FLAG_VERTEX_ALTER_COUNTER);
public static final Text FLAG_VERTEX_TOTAL_VERTICES = new
Text(S_FLAG_VERTEX_TOTAL_VERTICES);
+ public static final Text FLAG_AGGREGATOR_SKIP = new
Text(S_FLAG_AGGREGATOR_SKIP);
public static final String MESSAGE_COMBINER_CLASS_KEY =
"hama.vertex.message.combiner.class";
public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
@@ -193,14 +195,14 @@ public final class GraphJobRunner<V exte
if (globalUpdateCounts == 0) {
updatedCnt.put(FLAG_MESSAGE_COUNTS, new
IntWritable(Integer.MIN_VALUE));
} else {
- aggregationRunner.doMasterAggregation(updatedCnt);
+ getAggregationRunner().doMasterAggregation(updatedCnt);
}
// send the updates from the master tasks back to the slaves
for (String peerName : peer.getAllPeerNames()) {
peer.send(peerName, new GraphJobMessage(updatedCnt));
}
}
- if (aggregationRunner.isEnabled() && iteration > 1) {
+ if (getAggregationRunner().isEnabled() && iteration > 1) {
// in case we need to sync, we need to replay the messages that already
// are added to the queue. This prevents loosing messages when using
// aggregators.
@@ -214,11 +216,12 @@ public final class GraphJobRunner<V exte
// now sync
peer.sync();
// now the map message must be read that might be send from the master
- updated = aggregationRunner.receiveAggregatedValues(peer
+ updated = getAggregationRunner().receiveAggregatedValues(peer
.getCurrentMessage().getMap(), iteration);
// set the first vertex message back to the message it had before sync
firstVertexMessage = peer.getCurrentMessage();
}
+ this.aggregationRunner.resetSkipAggregators();
return firstVertexMessage;
}
@@ -267,7 +270,7 @@ public final class GraphJobRunner<V exte
}
currentMessage = iterable.getOverflowMessage();
}
- aggregationRunner.aggregateVertex(lastValue, vertex);
+ getAggregationRunner().aggregateVertex(lastValue, vertex);
activeVertices++;
}
@@ -277,7 +280,7 @@ public final class GraphJobRunner<V exte
}
vertices.finishSuperstep();
- aggregationRunner.sendAggregatorValues(peer, activeVertices,
this.changedVertexCnt);
+ getAggregationRunner().sendAggregatorValues(peer, activeVertices,
this.changedVertexCnt);
iteration++;
}
@@ -338,11 +341,11 @@ public final class GraphJobRunner<V exte
Vertex<V, E, M> vertex = skippingIterator.next();
M lastValue = vertex.getValue();
vertex.compute(Collections.singleton(vertex.getValue()));
- aggregationRunner.aggregateVertex(lastValue, vertex);
+ getAggregationRunner().aggregateVertex(lastValue, vertex);
vertices.finishVertexComputation(vertex);
}
vertices.finishSuperstep();
- aggregationRunner.sendAggregatorValues(peer, 1, this.changedVertexCnt);
+ getAggregationRunner().sendAggregatorValues(peer, 1,
this.changedVertexCnt);
iteration++;
}
@@ -376,8 +379,8 @@ public final class GraphJobRunner<V exte
vertexOutputWriter = (VertexOutputWriter<Writable, Writable, V, E, M>)
ReflectionUtils
.newInstance(outputWriter);
- aggregationRunner = new AggregationRunner<V, E, M>();
- aggregationRunner.setupAggregators(peer);
+ setAggregationRunner(new AggregationRunner<V, E, M>());
+ getAggregationRunner().setupAggregators(peer);
Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<?
extends VerticesInfo<V, E, M>>) conf.getClass("hama.graph.vertices.info",
ListVerticesInfo.class, VerticesInfo.class);
vertices = ReflectionUtils.newInstance(verticesInfoClass);
@@ -549,13 +552,13 @@ public final class GraphJobRunner<V exte
} else {
globalUpdateCounts += ((IntWritable) e.getValue()).get();
}
- } else if (aggregationRunner.isEnabled()
+ } else if (getAggregationRunner().isEnabled()
&& vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
- aggregationRunner.masterReadAggregatedValue(vertexID,
+ getAggregationRunner().masterReadAggregatedValue(vertexID,
(M) e.getValue());
- } else if (aggregationRunner.isEnabled()
+ } else if (getAggregationRunner().isEnabled()
&& vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
- aggregationRunner.masterReadAggregatedIncrementalValue(vertexID,
+
getAggregationRunner().masterReadAggregatedIncrementalValue(vertexID,
(M) e.getValue());
} else if (FLAG_VERTEX_INCREASE.equals(vertexID)) {
dynamicAdditions = true;
@@ -571,6 +574,12 @@ public final class GraphJobRunner<V exte
} else {
throw new UnsupportedOperationException("A message to increase
vertex count is in a wrong place: " + peer);
}
+ } else if (FLAG_AGGREGATOR_SKIP.equals(vertexID)) {
+ if (isMasterTask(peer)) {
+ this.getAggregationRunner().addSkipAggregator(((IntWritable)
e.getValue()).get());
+ } else {
+ throw new UnsupportedOperationException("A message to skip
aggregators is in a wrong peer: " + peer);
+ }
}
}
@@ -626,7 +635,7 @@ public final class GraphJobRunner<V exte
* @return the value of the aggregator, or null if none was defined.
*/
public final Writable getLastAggregatedValue(int index) {
- return aggregationRunner.getLastAggregatedValue(index);
+ return getAggregationRunner().getLastAggregatedValue(index);
}
/**
@@ -636,7 +645,7 @@ public final class GraphJobRunner<V exte
* @return the value of the aggregator, or null if none was defined.
*/
public final IntWritable getNumLastAggregatedVertices(int index) {
- return aggregationRunner.getNumLastAggregatedVertices(index);
+ return getAggregationRunner().getNumLastAggregatedVertices(index);
}
/**
@@ -707,4 +716,18 @@ public final class GraphJobRunner<V exte
this.changedVertexCnt = changedVertexCnt;
}
+ /**
+ * @return the aggregationRunner
+ */
+ AggregationRunner<V, E, M> getAggregationRunner() {
+ return aggregationRunner;
+ }
+
+ /**
+ * @param aggregationRunner the aggregationRunner to set
+ */
+ void setAggregationRunner(AggregationRunner<V, E, M> aggregationRunner) {
+ this.aggregationRunner = aggregationRunner;
+ }
+
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1529170&r1=1529169&r2=1529170&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Oct 4
14:29:15 2013
@@ -228,7 +228,7 @@ public abstract class Vertex<V extends W
* @return the configured partitioner instance to message vertices.
*/
public Partitioner<V, M> getPartitioner() {
- return (Partitioner<V, M>) runner.getPartitioner();
+ return runner.getPartitioner();
}
@Override
@@ -241,6 +241,21 @@ public abstract class Vertex<V extends W
this.votedToHalt = true;
}
+ /**
+ * Disable an aggregator for the next superstep. The returning value of
+ * the aggregator will be null.
+ */
+ public void skipAggregator(int index) throws IOException {
+ MapWritable msg = new MapWritable();
+ msg.put(GraphJobRunner.FLAG_AGGREGATOR_SKIP, new IntWritable(index));
+
+ this.runner.getAggregationRunner().addSkipAggregator(index);
+
+ // Get master task peer.
+ String destPeer = GraphJobRunner.getMasterTask(this.getPeer());
+ runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+ }
+
void setActive() {
this.votedToHalt = false;
}