Author: edwardyoon
Date: Thu Jan 23 08:31:02 2014
New Revision: 1560606
URL: http://svn.apache.org/r1560606
Log:
Remove skipiterator feature
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1560606&r1=1560605&r2=1560606&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Thu Jan 23 08:31:02 2014
@@ -18,8 +18,6 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
@@ -44,7 +42,6 @@ public final class AggregationRunner<V e
// multiple aggregator arrays
private Aggregator<M, Vertex<V, E, M>>[] aggregators;
- private Set<Integer> skipAggregators;
private Writable[] globalAggregatorResult;
private IntWritable[] globalAggregatorIncrement;
private boolean[] isAbstractAggregator;
@@ -63,7 +60,6 @@ public final class AggregationRunner<V e
this.conf = peer.getConfiguration();
String aggregatorClasses = peer.getConfiguration().get(
GraphJob.AGGREGATOR_CLASS_ATTR);
- this.skipAggregators = new HashSet<Integer>();
if (aggregatorClasses != null) {
enabled = true;
aggregatorClassNames = aggregatorClasses.split(";");
@@ -95,7 +91,8 @@ public final class AggregationRunner<V e
/**
* Runs the aggregators by sending their values to the master task.
- * @param changedVertexCnt
+ *
+ * @param changedVertexCnt
*/
public void sendAggregatorValues(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
@@ -110,22 +107,18 @@ public final class AggregationRunner<V e
// also send aggregated values to the master
if (aggregators != null) {
for (int i = 0; i < this.aggregators.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
- if (isAbstractAggregator[i]) {
- updatedCnt.put(aggregatorIncrementFlag[i],
- ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
- .getTimesAggregated());
- }
+ updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+ if (isAbstractAggregator[i]) {
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+ .getTimesAggregated());
}
}
for (int i = 0; i < aggregators.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- // now create new aggregators for the next iteration
- aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
- if (GraphJobRunner.isMasterTask(peer)) {
- masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
- }
+ // now create new aggregators for the next iteration
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
}
}
}
@@ -143,14 +136,12 @@ public final class AggregationRunner<V e
public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
if (isEnabled()) {
for (int i = 0; i < this.aggregators.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
- aggregator.aggregate(v, v.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
- intern.aggregate(v, lastValue, v.getValue());
- intern.aggregateInternal();
- }
+ Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+ aggregator.aggregate(v, v.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
+ intern.aggregate(v, lastValue, v.getValue());
+ intern.aggregateInternal();
}
}
}
@@ -164,21 +155,19 @@ public final class AggregationRunner<V e
if (isEnabled()) {
// work through the master aggregators
for (int i = 0; i < masterAggregator.length; i++) {
- if (!this.skipAggregators.contains(i)) {
- Writable lastAggregatedValue = masterAggregator[i].getValue();
- if (isAbstractAggregator[i]) {
- final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
- final Writable finalizeAggregation = intern.finalizeAggregation();
- if (intern.finalizeAggregation() != null) {
- lastAggregatedValue = finalizeAggregation;
- }
- // this count is usually the times of active
- // vertices in the graph
- updatedCnt.put(aggregatorIncrementFlag[i],
- intern.getTimesAggregated());
+ Writable lastAggregatedValue = masterAggregator[i].getValue();
+ if (isAbstractAggregator[i]) {
+ final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
+ final Writable finalizeAggregation = intern.finalizeAggregation();
+ if (intern.finalizeAggregation() != null) {
+ lastAggregatedValue = finalizeAggregation;
}
- updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
+ // this count is usually the times of active
+ // vertices in the graph
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ intern.getTimesAggregated());
}
+ updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
}
}
}
@@ -193,9 +182,9 @@ public final class AggregationRunner<V e
long iteration) throws IOException, SyncException, InterruptedException {
// map is the first value that is in the queue
for (int i = 0; i < aggregators.length; i++) {
- globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
- globalAggregatorIncrement[i] = (IntWritable) updatedValues
- .get(aggregatorIncrementFlag[i]);
+ globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+ globalAggregatorIncrement[i] = (IntWritable) updatedValues
+ .get(aggregatorIncrementFlag[i]);
}
IntWritable count = (IntWritable) updatedValues
.get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
@@ -233,22 +222,6 @@ public final class AggregationRunner<V e
}
}
- /**
- * This method adds an id of an aggregator that will be skipped in the
current
- * superstep.
- */
- public void addSkipAggregator(int index) {
- this.skipAggregators.add(index);
- }
-
- /**
- * This method adds an id of an aggregator that will be skipped in the
current
- * superstep.
- */
- void resetSkipAggregators() {
- this.skipAggregators.clear();
- }
-
@SuppressWarnings("unchecked")
private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
try {
@@ -270,4 +243,4 @@ public final class AggregationRunner<V e
return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
globalAggregatorIncrement.length)];
}
-}
\ No newline at end of file
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1560606&r1=1560605&r2=1560606&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Jan 23 08:31:02 2014
@@ -68,7 +68,6 @@ public final class GraphJobRunner<V exte
public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
- public static final String S_FLAG_AGGREGATOR_SKIP = "hama.7";
public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
public static final Text FLAG_VERTEX_INCREASE = new Text(
S_FLAG_VERTEX_INCREASE);
@@ -229,7 +228,6 @@ public final class GraphJobRunner<V exte
// set the first vertex message back to the message it had before sync
firstVertexMessage = peer.getCurrentMessage();
}
- this.aggregationRunner.resetSkipAggregators();
return firstVertexMessage;
}
@@ -742,7 +740,7 @@ public final class GraphJobRunner<V exte
/**
* @return the aggregationRunner
*/
- AggregationRunner<V, E, M> getAggregationRunner() {
+ public AggregationRunner<V, E, M> getAggregationRunner() {
return aggregationRunner;
}