Author: tjungblut
Date: Tue Sep 11 12:15:21 2012
New Revision: 1383375
URL: http://svn.apache.org/viewvc?rev=1383375&view=rev
Log:
[HAMA-597]: Split a GraphJobRunner into multiple classes Part 1
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Removed:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1383375&r1=1383374&r2=1383375&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Sep 11 12:15:21 2012
@@ -13,6 +13,7 @@ Release 0.6 (unreleased changes)
IMPROVEMENTS
+ HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon &
tjungblut)
HAMA-557: Implement Checkpointing service in Hama (surajmenon)
HAMA-587: Synchronization Client should provide API's to store and retrieve
information among peers and BSPMaster (surajmenon)
HAMA-610: Provision to define the task allocation strategy as a feature of
job. (surajmenon)
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1383375&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
(added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Tue Sep 11 12:15:21 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Runner class to do the tasks that need to be done if aggregation was
+ * configured.
+ *
+ */
+public final class AggregationRunner<V extends Writable, E extends Writable, M
extends Writable> {
+
+ // multiple aggregator arrays
+ private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+ private Writable[] globalAggregatorResult;
+ private IntWritable[] globalAggregatorIncrement;
+ private boolean[] isAbstractAggregator;
+ private String[] aggregatorClassNames;
+ private Text[] aggregatorValueFlag;
+ private Text[] aggregatorIncrementFlag;
+ // aggregator on the master side
+ private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+
+ private boolean enabled = false;
+ private Configuration conf;
+
+ @SuppressWarnings("unchecked")
+ public void setupAggregators(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+ this.conf = peer.getConfiguration();
+ String aggregatorClasses = peer.getConfiguration().get(
+ GraphJob.AGGREGATOR_CLASS_ATTR);
+ if (aggregatorClasses != null) {
+ enabled = true;
+ aggregatorClassNames = aggregatorClasses.split(";");
+ // init to the split size
+ aggregators = new Aggregator[aggregatorClassNames.length];
+ globalAggregatorResult = new Writable[aggregatorClassNames.length];
+ globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
+ isAbstractAggregator = new boolean[aggregatorClassNames.length];
+ aggregatorValueFlag = new Text[aggregatorClassNames.length];
+ aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator = new Aggregator[aggregatorClassNames.length];
+ }
+ for (int i = 0; i < aggregatorClassNames.length; i++) {
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ aggregatorValueFlag[i] = new Text(
+ GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
+ aggregatorIncrementFlag[i] = new Text(
+ GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
+ if (aggregators[i] instanceof AbstractAggregator) {
+ isAbstractAggregator[i] = true;
+ }
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs the aggregators by sending their values to the master task.
+ */
+ public void sendAggregatorValues(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ int activeVertices) throws IOException {
+ // send msgCounts to the master task
+ MapWritable updatedCnt = new MapWritable();
+ updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
+ activeVertices));
+ // also send aggregated values to the master
+ if (aggregators != null) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+ if (isAbstractAggregator[i]) {
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+ .getTimesAggregated());
+ }
+ }
+ for (int i = 0; i < aggregators.length; i++) {
+ // now create new aggregators for the next iteration
+ aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+ if (GraphJobRunner.isMasterTask(peer)) {
+ masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+ }
+ }
+ }
+ peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
+ updatedCnt));
+ }
+
+ /**
+ * Aggregates the last value before computation and the value after the
+ * computation.
+ *
+ * @param lastValue the value before compute().
+ * @param v the vertex.
+ */
+ public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
+ if (isEnabled()) {
+ for (int i = 0; i < this.aggregators.length; i++) {
+ Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+ aggregator.aggregate(v, v.getValue());
+ if (isAbstractAggregator[i]) {
+ AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
+ intern.aggregate(v, lastValue, v.getValue());
+ intern.aggregateInternal();
+ }
+ }
+ }
+ }
+
+ /**
+ * The method the master task does, it globally aggregates the values of each
+ * peer and updates the given map accordingly.
+ */
+ public void doMasterAggregation(MapWritable updatedCnt) {
+ if (isEnabled()) {
+ // work through the master aggregators
+ for (int i = 0; i < masterAggregator.length; i++) {
+ Writable lastAggregatedValue = masterAggregator[i].getValue();
+ if (isAbstractAggregator[i]) {
+ final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
+ final Writable finalizeAggregation = intern.finalizeAggregation();
+ if (intern.finalizeAggregation() != null) {
+ lastAggregatedValue = finalizeAggregation;
+ }
+ // this count is usually the times of active
+ // vertices in the graph
+ updatedCnt.put(aggregatorIncrementFlag[i],
+ intern.getTimesAggregated());
+ }
+ updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
+ }
+ }
+ }
+
+ /**
+ * Receives aggregated values from a master task, by doing an additional
+ * barrier sync and parsing the messages.
+ *
+ * @return always true if no aggregators are defined, false if aggregators
say
+ * we haven't seen any messages anymore.
+ */
+ public boolean receiveAggregatedValues(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ long iteration) throws IOException, SyncException, InterruptedException {
+ // if we have an aggregator defined, we must make an additional sync
+ // to have the updated values available on all our peers.
+ if (isEnabled() && iteration > 1) {
+ peer.sync();
+
+ MapWritable updatedValues = peer.getCurrentMessage().getMap();
+ for (int i = 0; i < aggregators.length; i++) {
+ globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+ globalAggregatorIncrement[i] = (IntWritable) updatedValues
+ .get(aggregatorIncrementFlag[i]);
+ }
+ IntWritable count = (IntWritable) updatedValues
+ .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
+ if (count != null && count.get() == Integer.MIN_VALUE) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @return true if aggregators were defined. Normally used by the internal
+ * stateful methods, outside shouldn't use it too extensively.
+ */
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ /**
+ * Method to let the master read messages from peers and aggregate a value.
+ */
+ public void masterReadAggregatedValue(Text textIndex, M value) {
+ int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+ masterAggregator[index].aggregate(null, value);
+ }
+
+ /**
+ * Method to let the master read messages from peers and aggregate the
+ * incremental value.
+ */
+ public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
+ int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+ if (isAbstractAggregator[index]) {
+ ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+ .addTimesAggregated(((IntWritable) value).get());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
+ try {
+ return (Aggregator<M, Vertex<V, E, M>>) ReflectionUtils.newInstance(
+ conf.getClassByName(clsName), conf);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ throw new IllegalArgumentException("Aggregator class " + clsName
+ + " could not be found or instantiated!");
+ }
+
+ public final Writable getLastAggregatedValue(int index) {
+ return globalAggregatorResult[Preconditions.checkPositionIndex(index,
+ globalAggregatorResult.length)];
+ }
+
+ public final IntWritable getNumLastAggregatedVertices(int index) {
+ return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
+ globalAggregatorIncrement.length)];
+ }
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1383375&r1=1383374&r2=1383375&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Tue Sep 11 12:15:21 2012
@@ -135,7 +135,7 @@ public final class GraphJobMessage imple
map = new MapWritable();
map.readFields(in);
} else if (isPartitioningMessage()) {
- Vertex<Writable, Writable, Writable> vertex = GraphJobRunnerBase
+ Vertex<Writable, Writable, Writable> vertex = GraphJobRunner
.newVertexInstance(VERTEX_CLASS, null);
Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
vertexId.readFields(in);
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1383375&r1=1383374&r2=1383375&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Tue Sep 11 12:15:21 2012
@@ -18,20 +18,30 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
/**
* Fully generic graph job runner.
@@ -41,17 +51,177 @@ import org.apache.hama.bsp.sync.SyncExce
* @param <M> the value type of a vertex.
*/
public final class GraphJobRunner<V extends Writable, E extends Writable, M
extends Writable>
- extends GraphJobRunnerBase<V, E, M> {
+ extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
+
+ private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+
+ // make sure that these values don't collide with the vertex names
+ public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
+ public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
+ public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+ public static final Text FLAG_MESSAGE_COUNTS = new
Text(S_FLAG_MESSAGE_COUNTS);
+
+ public static final String MESSAGE_COMBINER_CLASS =
"hama.vertex.message.combiner.class";
+ public static final String GRAPH_REPAIR = "hama.graph.repair";
+
+ private Configuration conf;
+ private Combiner<M> combiner;
+ private Partitioner<V, M> partitioner;
+
+ private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
+
+ private boolean updated = true;
+ private int globalUpdateCounts = 0;
+
+ private long numberVertices = 0;
+ // -1 is deactivated
+ private int maxIteration = -1;
+ private long iteration;
+
+ private Class<V> vertexIdClass;
+ private Class<M> vertexValueClass;
+ private Class<E> edgeValueClass;
+ private Class<Vertex<V, E, M>> vertexClass;
+
+ private AggregationRunner<V, E, M> aggregationRunner;
+
+ private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
@Override
- @SuppressWarnings("unchecked")
public final void setup(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException, SyncException, InterruptedException {
+
+ setupFields(peer);
+
+ loadVertices(peer);
+
+ countGlobalVertexCount(peer);
+
+ doInitialSuperstep(peer);
+
+ }
+
+ @Override
+ public final void bsp(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException, SyncException, InterruptedException {
+
+ // we do supersteps while we still have updates and have not reached our
+ // maximum iterations yet
+ while (updated && !((maxIteration > 0) && iteration > maxIteration)) {
+ // reset the global update counter from our master in every superstep
+ globalUpdateCounts = 0;
+ peer.sync();
+
+ // note that the messages must be parsed here
+ final Map<V, List<M>> messages = parseMessages(peer);
+ // master needs to update
+ doMasterUpdates(peer);
+ // if aggregators say we don't have updates anymore, break
+ if (!aggregationRunner.receiveAggregatedValues(peer, iteration)) {
+ break;
+ }
+ // loop over vertices and do their computation
+ doSuperstep(messages, peer);
+ }
+ }
+
+ /**
+ * Just write <ID as Writable, Value as Writable> pair as a result. Note that
+ * this will also be executed when failure happened.
+ */
+ @Override
+ public final void cleanup(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException {
+ for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
+ peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+ }
+ }
+
+ /**
+ * The master task is going to check the number of updated vertices and do
+ * master aggregation. In case of no aggregators defined, we save a sync by
+ * reading multiple typed messages.
+ */
+ private void doMasterUpdates(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException {
+ if (isMasterTask(peer) && iteration > 1) {
+ MapWritable updatedCnt = new MapWritable();
+ // exit if there's no update made
+ if (globalUpdateCounts == 0) {
+ updatedCnt.put(FLAG_MESSAGE_COUNTS, new
IntWritable(Integer.MIN_VALUE));
+ } else {
+ aggregationRunner.doMasterAggregation(updatedCnt);
+ }
+ // send the updates from the mater tasks back to the slaves
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new GraphJobMessage(updatedCnt));
+ }
+ }
+ }
+
+ /**
+ * Do the main logic of a superstep, namely checking if vertices are active,
+ * feeding compute with messages and controlling combiners/aggregators.
+ */
+ private void doSuperstep(Map<V, List<M>> messages,
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException {
+ int activeVertices = 0;
+ for (Vertex<V, E, M> vertex : vertices.values()) {
+ List<M> msgs = messages.get(vertex.getVertexID());
+ // If there are newly received messages, restart.
+ if (vertex.isHalted() && msgs != null) {
+ vertex.setActive();
+ }
+ if (msgs == null) {
+ msgs = Collections.emptyList();
+ }
+
+ if (!vertex.isHalted()) {
+ if (combiner != null) {
+ M combined = combiner.combine(msgs);
+ msgs = new ArrayList<M>();
+ msgs.add(combined);
+ }
+ M lastValue = vertex.getValue();
+ vertex.compute(msgs.iterator());
+ aggregationRunner.aggregateVertex(lastValue, vertex);
+ if (!vertex.isHalted()) {
+ activeVertices++;
+ }
+ }
+ }
+
+ aggregationRunner.sendAggregatorValues(peer, activeVertices);
+ iteration++;
+ }
+
+ /**
+ * Seed the vertices first with their own values in compute. This is the
first
+ * superstep after the vertices have been loaded.
+ */
+ private void doInitialSuperstep(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException {
+ for (Vertex<V, E, M> vertex : vertices.values()) {
+ List<M> singletonList = Collections.singletonList(vertex.getValue());
+ M lastValue = vertex.getValue();
+ vertex.compute(singletonList.iterator());
+ aggregationRunner.aggregateVertex(lastValue, vertex);
+ }
+ aggregationRunner.sendAggregatorValues(peer, 1);
+ iteration++;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void setupFields(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
this.peer = peer;
this.conf = peer.getConfiguration();
- // Choose one as a master to collect global updates
- this.masterTask = peer.getPeerName(0);
maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
-1);
@@ -65,14 +235,12 @@ public final class GraphJobRunner<V exte
vertexClass = (Class<Vertex<V, E, M>>) conf.getClass(
"hama.graph.vertex.class", Vertex.class);
+ // set the classes statically, so we can save memory per message
GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
GraphJobMessage.VERTEX_VALUE_CLASS = vertexValueClass;
GraphJobMessage.VERTEX_CLASS = vertexClass;
GraphJobMessage.EDGE_VALUE_CLASS = edgeValueClass;
- boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
- boolean runtimePartitioning = conf.getBoolean(
- GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
partitioner = (Partitioner<V, M>) ReflectionUtils.newInstance(
conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
conf);
@@ -85,195 +253,429 @@ public final class GraphJobRunner<V exte
conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
conf);
}
- String aggregatorClasses = conf.get(GraphJob.AGGREGATOR_CLASS_ATTR);
- if (aggregatorClasses != null) {
- LOG.debug("Aggregator classes: " + aggregatorClasses);
- aggregatorClassNames = aggregatorClasses.split(";");
- // init to the split size
- aggregators = new Aggregator[aggregatorClassNames.length];
- globalAggregatorResult = new Writable[aggregatorClassNames.length];
- globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
- isAbstractAggregator = new boolean[aggregatorClassNames.length];
- aggregatorValueFlag = new Text[aggregatorClassNames.length];
- aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
- if (isMasterTask(peer)) {
- masterAggregator = new Aggregator[aggregatorClassNames.length];
- }
- for (int i = 0; i < aggregatorClassNames.length; i++) {
- aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
- aggregatorValueFlag[i] = new Text(S_FLAG_AGGREGATOR_VALUE + ";" + i);
- aggregatorIncrementFlag[i] = new Text(S_FLAG_AGGREGATOR_INCREMENT + ";"
- + i);
- if (aggregators[i] instanceof AbstractAggregator) {
- isAbstractAggregator[i] = true;
- }
- if (isMasterTask(peer)) {
- masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
- }
- }
- }
- VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
+ aggregationRunner = new AggregationRunner<V, E, M>();
+ aggregationRunner.setupAggregators(peer);
+ }
+
+ /**
+ * Loads vertices into memory of each peer. TODO this needs to be simplified.
+ */
+ @SuppressWarnings("unchecked")
+ private void loadVertices(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException, SyncException, InterruptedException {
+
+ /*
+ * Several partitioning constants begin
+ */
+
+ final VertexInputReader<Writable, Writable, V, E, M> reader =
(VertexInputReader<Writable, Writable, V, E, M>) ReflectionUtils
.newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
VertexInputReader.class), conf);
- loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader,
- this);
+ final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
+ final boolean runtimePartitioning = conf.getBoolean(
+ GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
- for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new GraphJobMessage(new
IntWritable(vertices.size())));
- }
+ final long splitSize = peer.getSplitSize();
+ final int partitioningSteps = partitionMultiSteps(peer, splitSize);
+ final long interval = splitSize / partitioningSteps;
+
+ final boolean selfReference = conf.getBoolean("hama.graph.self.ref",
false);
+
+ /*
+ * Several partitioning constants end
+ */
+
+ LOG.debug("vertex class: " + vertexClass);
+ Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
+ vertex.runner = this;
+
+ long startPos = peer.getPos();
+ if (startPos == 0)
+ startPos = 1L;
+
+ KeyValuePair<Writable, Writable> next = null;
+ int steps = 1;
+ while ((next = peer.readNext()) != null) {
+ boolean vertexFinished = reader.parseVertex(next.getKey(),
+ next.getValue(), vertex);
+ if (!vertexFinished) {
+ continue;
+ }
- peer.sync();
+ if (vertex.getEdges() == null) {
+ if (selfReference) {
+ vertex.setEdges(Collections.singletonList(new Edge<V, E>(vertex
+ .getVertexID(), null)));
+ } else {
+ vertex.setEdges(Collections.EMPTY_LIST);
+ }
+ }
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- if (msg.isVerticesSizeMessage()) {
- numberVertices += msg.getVerticesSize().get();
+ if (selfReference) {
+ vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
- }
- // TODO refactor this to a single step
- for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
- LinkedList<M> msgIterator = new LinkedList<M>();
- Vertex<V, E, M> v = e.getValue();
- msgIterator.add(v.getValue());
- M lastValue = v.getValue();
- v.compute(msgIterator.iterator());
- if (this.aggregators != null) {
- for (int i = 0; i < this.aggregators.length; i++) {
- Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
- aggregator.aggregate(v, v.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern =
(AbstractAggregator<M, Vertex<V, E, M>>) aggregator;
- intern.aggregate(v, lastValue, v.getValue());
- intern.aggregateInternal();
+ if (runtimePartitioning) {
+ int partition = partitioner.getPartition(vertex.getVertexID(),
+ vertex.getValue(), peer.getNumPeers());
+ peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
+ } else {
+ vertex.setup(conf);
+ vertices.put(vertex.getVertexID(), vertex);
+ }
+ vertex = newVertexInstance(vertexClass, conf);
+ vertex.runner = this;
+
+ if (runtimePartitioning) {
+ if (steps < partitioningSteps && (peer.getPos() - startPos) >=
interval) {
+ peer.sync();
+ steps++;
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+ messagedVertex.runner = this;
+ messagedVertex.setup(conf);
+ vertices.put(messagedVertex.getVertexID(), messagedVertex);
}
+ startPos = peer.getPos();
}
}
}
- runAggregators(peer, 1);
- iteration++;
- }
- @Override
- public final void bsp(
- BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
- throws IOException, SyncException, InterruptedException {
+ if (runtimePartitioning) {
+ peer.sync();
- while (updated && !((maxIteration > 0) && iteration > maxIteration)) {
- globalUpdateCounts = 0;
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+ messagedVertex.runner = this;
+ messagedVertex.setup(conf);
+ vertices.put(messagedVertex.getVertexID(), messagedVertex);
+ }
+ }
+ LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
+
+ /*
+ * If the user want to repair the graph, it should traverse through that
+ * local chunk of adjancency list and message the corresponding peer to
+ * check whether that vertex exists. In real-life this may be dead-ending
+ * vertices, since we have no information about outgoing edges. Mainly this
+ * procedure is to prevent NullPointerExceptions from happening.
+ */
+ if (repairNeeded) {
+ LOG.debug("Starting repair of this graph!");
+
+ int multiSteps = 0;
+ MapWritable ssize = new MapWritable();
+ ssize.put(new IntWritable(peer.getPeerIndex()),
+ new IntWritable(vertices.size()));
+ peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
+ ssize = null;
peer.sync();
- // Map <vertexID, messages>
- final Map<V, LinkedList<M>> messages = parseMessages(peer);
- // use iterations here, since repair can skew the number of
- // supersteps
- if (isMasterTask(peer) && iteration > 1) {
- MapWritable updatedCnt = new MapWritable();
- // exit if there's no update made
- if (globalUpdateCounts == 0) {
- updatedCnt.put(FLAG_MESSAGE_COUNTS,
- new IntWritable(Integer.MIN_VALUE));
- } else {
- if (aggregators != null) {
- // work through the master aggregators
- for (int i = 0; i < masterAggregator.length; i++) {
- Writable lastAggregatedValue = masterAggregator[i].getValue();
- if (isAbstractAggregator[i]) {
- final AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[i]);
- final Writable finalizeAggregation = intern
- .finalizeAggregation();
- if (intern.finalizeAggregation() != null) {
- lastAggregatedValue = finalizeAggregation;
- }
- // this count is usually the times of active
- // vertices in the graph
- updatedCnt.put(aggregatorIncrementFlag[i],
- intern.getTimesAggregated());
- }
- updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
+ if (isMasterTask(peer)) {
+ int minVerticesSize = Integer.MAX_VALUE;
+ GraphJobMessage received = null;
+ while ((received = peer.getCurrentMessage()) != null) {
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ int curr = ((IntWritable) e.getValue()).get();
+ if (minVerticesSize > curr) {
+ minVerticesSize = curr;
}
}
}
+
+ if (minVerticesSize < (partitioningSteps * 2)) {
+ multiSteps = minVerticesSize;
+ } else {
+ multiSteps = (partitioningSteps * 2);
+ }
+
for (String peerName : peer.getAllPeerNames()) {
- peer.send(peerName, new GraphJobMessage(updatedCnt));
+ MapWritable temp = new MapWritable();
+ temp.put(new Text("steps"), new IntWritable(multiSteps));
+ peer.send(peerName, new GraphJobMessage(temp));
}
}
- // if we have an aggregator defined, we must make an additional sync
- // to have the updated values available on all our peers.
- if (aggregators != null && iteration > 1) {
- peer.sync();
-
- MapWritable updatedValues = peer.getCurrentMessage().getMap();
- for (int i = 0; i < aggregators.length; i++) {
- globalAggregatorResult[i] =
updatedValues.get(aggregatorValueFlag[i]);
- globalAggregatorIncrement[i] = (IntWritable) updatedValues
- .get(aggregatorIncrementFlag[i]);
- }
- IntWritable count = (IntWritable) updatedValues
- .get(FLAG_MESSAGE_COUNTS);
- if (count != null && count.get() == Integer.MIN_VALUE) {
- updated = false;
- break;
- }
+ peer.sync();
+
+ GraphJobMessage received = peer.getCurrentMessage();
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ multiSteps = ((IntWritable) e.getValue()).get();
}
- int activeVertices = 0;
- for (Vertex<V, E, M> vertex : vertices.values()) {
- LinkedList<M> msgs = messages.get(vertex.getVertexID());
- // If there are newly received messages, restart.
- if (vertex.isHalted() && msgs != null) {
- vertex.setActive();
- }
- if (msgs == null) {
- msgs = new LinkedList<M>();
- }
+ Set<V> keys = vertices.keySet();
+ Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
- if (!vertex.isHalted()) {
- if (combiner != null) {
- M combined = combiner.combine(msgs);
- msgs = new LinkedList<M>();
- msgs.add(combined);
- }
- M lastValue = vertex.getValue();
- vertex.compute(msgs.iterator());
+ int i = 0;
+ int syncs = 0;
+ for (V v : keys) {
+ Vertex<V, E, M> vertex2 = vertices.get(v);
+ for (Edge<V, E> e : vertices.get(v).getEdges()) {
+ peer.send(vertex2.getDestinationPeerName(e),
+ new GraphJobMessage(e.getDestinationVertexID()));
+ }
- if (aggregators != null) {
- if (this.aggregators != null) {
- for (int i = 0; i < this.aggregators.length; i++) {
- Aggregator<M, Vertex<V, E, M>> aggregator =
this.aggregators[i];
- aggregator.aggregate(vertex, vertex.getValue());
- if (isAbstractAggregator[i]) {
- AbstractAggregator<M, Vertex<V, E, M>> intern =
((AbstractAggregator<M, Vertex<V, E, M>>) aggregator);
- intern.aggregate(vertex, lastValue, vertex.getValue());
- intern.aggregateInternal();
- }
+ if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+ peer.sync();
+ syncs++;
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ V vertexName = (V) msg.getVertexId();
+ if (!vertices.containsKey(vertexName)) {
+ Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+ newVertex.setVertexID(vertexName);
+ newVertex.runner = this;
+ if (selfReference) {
+ newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+ newVertex.getVertexID(), null)));
+ } else {
+ newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
+ newVertex.setup(conf);
+ tmp.put(vertexName, newVertex);
}
}
- if (!vertex.isHalted()) {
- activeVertices++;
+ }
+ i++;
+ }
+
+ peer.sync();
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ V vertexName = (V) msg.getVertexId();
+ if (!vertices.containsKey(vertexName)) {
+ Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+ newVertex.setVertexID(vertexName);
+ newVertex.runner = this;
+ if (selfReference) {
+ newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+ newVertex.getVertexID(), null)));
+ } else {
+ newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
}
+ newVertex.setup(conf);
+ vertices.put(vertexName, newVertex);
+ newVertex = null;
}
}
- runAggregators(peer, activeVertices);
- iteration++;
+ for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
+ vertices.put(e.getKey(), e.getValue());
+ }
+ tmp.clear();
}
+
+ LOG.debug("Starting Vertex processing!");
}
/**
- * Just write <ID as Writable, Value as Writable> pair as a result. Note that
- * this will also be executed when failure happened.
+ * Partitions our vertices through multiple supersteps to save memory.
*/
- @Override
- public final void cleanup(
+ private int partitionMultiSteps(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ long splitSize) throws IOException, SyncException, InterruptedException {
+ int multiSteps = 1;
+
+ MapWritable ssize = new MapWritable();
+ ssize
+ .put(new IntWritable(peer.getPeerIndex()), new
LongWritable(splitSize));
+ peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
+ ssize = null;
+ peer.sync();
+
+ if (isMasterTask(peer)) {
+ long maxSplitSize = 0L;
+ GraphJobMessage received = null;
+ while ((received = peer.getCurrentMessage()) != null) {
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ long curr = ((LongWritable) e.getValue()).get();
+ if (maxSplitSize < curr) {
+ maxSplitSize = curr;
+ }
+ }
+ }
+
+ int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+ "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
+
+ for (String peerName : peer.getAllPeerNames()) {
+ MapWritable temp = new MapWritable();
+ temp.put(new Text("max"), new IntWritable(steps));
+ peer.send(peerName, new GraphJobMessage(temp));
+ }
+ }
+ peer.sync();
+
+ GraphJobMessage received = peer.getCurrentMessage();
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ multiSteps = ((IntWritable) e.getValue()).get();
+ }
+ LOG.info(peer.getPeerName() + ": " + multiSteps);
+ return multiSteps;
+ }
+
+ /**
+ * Counts vertices globally by sending the count of vertices in the map to
the
+ * other peers.
+ */
+ private void countGlobalVertexCount(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+ throws IOException, SyncException, InterruptedException {
+ for (String peerName : peer.getAllPeerNames()) {
+ peer.send(peerName, new GraphJobMessage(new
IntWritable(vertices.size())));
+ }
+
+ peer.sync();
+
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ if (msg.isVerticesSizeMessage()) {
+ numberVertices += msg.getVerticesSize().get();
+ }
+ }
+ }
+
+ /**
+ * Parses the messages in every superstep and does actions according to flags
+ * in the messages.
+ *
+ * @return a map that contains messages pro vertex.
+ */
+ @SuppressWarnings("unchecked")
+ private Map<V, List<M>> parseMessages(
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
- for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
- peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+ GraphJobMessage msg = null;
+ final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+ while ((msg = peer.getCurrentMessage()) != null) {
+ // either this is a vertex message or a directive that must be read
+ // as map
+ if (msg.isVertexMessage()) {
+ final V vertexID = (V) msg.getVertexId();
+ final M value = (M) msg.getVertexValue();
+ List<M> msgs = msgMap.get(vertexID);
+ if (msgs == null) {
+ msgs = new ArrayList<M>();
+ msgMap.put(vertexID, msgs);
+ }
+ msgs.add(value);
+ } else if (msg.isMapMessage()) {
+ for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
+ Text vertexID = (Text) e.getKey();
+ if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
+ if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+ updated = false;
+ } else {
+ globalUpdateCounts += ((IntWritable) e.getValue()).get();
+ }
+ } else if (aggregationRunner.isEnabled()
+ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+ aggregationRunner.masterReadAggregatedValue(vertexID,
+ (M) e.getValue());
+ } else if (aggregationRunner.isEnabled()
+ && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+ aggregationRunner.masterReadAggregatedIncrementalValue(vertexID,
+ (M) e.getValue());
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException("Unknown message type: " +
msg);
+ }
+
}
+ return msgMap;
+ }
+
+ /**
+ * @return the number of vertices, globally accumulated.
+ */
+ public final long getNumberVertices() {
+ return numberVertices;
+ }
+
+ /**
+ * @return the current number of iterations.
+ */
+ public final long getNumberIterations() {
+ return iteration;
+ }
+
+ /**
+ * @return the defined number of maximum iterations, -1 if not defined.
+ */
+ public final int getMaxIteration() {
+ return maxIteration;
+ }
+
+ /**
+ * @return the defined partitioner instance.
+ */
+ public final Partitioner<V, M> getPartitioner() {
+ return partitioner;
+ }
+
+ /**
+ * Gets the last aggregated value at the given index. The index is dependend
+ * on how the aggregators were configured during job setup phase.
+ *
+ * @return the value of the aggregator, or null if none was defined.
+ */
+ public final Writable getLastAggregatedValue(int index) {
+ return aggregationRunner.getLastAggregatedValue(index);
+ }
+
+ /**
+ * Gets the last aggregated number of vertices at the given index. The index
+ * is dependend on how the aggregators were configured during job setup
phase.
+ *
+ * @return the value of the aggregator, or null if none was defined.
+ */
+ public final IntWritable getNumLastAggregatedVertices(int index) {
+ return aggregationRunner.getNumLastAggregatedVertices(index);
+ }
+
+ /**
+ * @return the peer instance.
+ */
+ public final BSPPeer<Writable, Writable, Writable, Writable,
GraphJobMessage> getPeer() {
+ return peer;
+ }
+
+ /**
+ * Checks if this is a master task. The master task is the first peer in the
+ * peer array.
+ */
+ public static boolean isMasterTask(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+ return peer.getPeerName().equals(getMasterTask(peer));
+ }
+
+ /**
+ * @return the name of the master peer, the name at the first index of the
+ * peers.
+ */
+ public static String getMasterTask(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+ return peer.getPeerName(0);
+ }
+
+ /**
+ * @return a new vertex instance
+ */
+ public static <V extends Writable, E extends Writable, M extends Writable>
Vertex<V, E, M> newVertexInstance(
+ Class<?> vertexClass, Configuration conf) {
+ @SuppressWarnings("unchecked")
+ Vertex<V, E, M> vertex = (Vertex<V, E, M>) ReflectionUtils.newInstance(
+ vertexClass, conf);
+ return vertex;
}
}