Author: edwardyoon
Date: Thu Aug 30 06:19:50 2012
New Revision: 1378811
URL: http://svn.apache.org/viewvc?rev=1378811&view=rev
Log:
Improvement of network-based runtime partitioner
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Aug 30 06:19:50 2012
@@ -12,6 +12,8 @@ Release 0.6 (unreleased changes)
IMPROVEMENTS
+ HAMA-599: Improvement of network-based runtime partitioner (edwardyoon)
+
Release 0.5 - April 10, 2012
NEW FEATURES
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Aug 30
06:19:50 2012
@@ -181,4 +181,15 @@ public interface BSPPeer<K1, V1, K2, V2,
* incremented.
*/
public void incrementCounter(String group, String counter, long amount);
+
+ /**
+ * @return the size of assigned split
+ */
+ public long getSplitSize();
+
+ /**
+ * @return the current position of the file read pointer
+ * @throws IOException
+ */
+ public long getPos() throws IOException;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Aug
30 06:19:50 2012
@@ -95,6 +95,8 @@ public final class BSPPeerImpl<K1, V1, K
private FaultTolerantPeerService<M> faultToleranceService;
+ private long splitSize = 0L;
+
/**
* Protected default constructor for LocalBSPRunner.
*/
@@ -335,9 +337,21 @@ public final class BSPPeerImpl<K1, V1, K
.getRecordReader(inputSplit, bspJob),
getCounter(BSPPeerImpl.PeerCounter.TASK_INPUT_RECORDS),
getCounter(BSPPeerImpl.PeerCounter.IO_BYTES_READ));
+ this.splitSize = inputSplit.getLength();
}
}
+ /**
+ * @return the size of assigned split
+ */
+ public long getSplitSize() {
+ return splitSize;
+ }
+
+ public long getPos() throws IOException {
+ return in.getPos();
+ }
+
public final void initilizeMessaging() throws ClassNotFoundException {
messenger = MessageManagerFactory.getMessageManager(conf);
messenger.init(taskId, this, conf, peerAddress);
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu
Aug 30 06:19:50 2012
@@ -286,6 +286,18 @@ public class TestCheckpoint extends Test
}
+ @Override
+ public long getSplitSize() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
public static class TempSyncClient extends BSPPeerSyncClient {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Aug 30 06:19:50 2012
@@ -19,18 +19,18 @@ package org.apache.hama.graph;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -386,14 +386,24 @@ public final class GraphJobRunner<V exte
VertexInputReader<Writable, Writable, V, E, M> reader)
throws IOException, SyncException, InterruptedException {
+ // //////////////////////////////////
+ long splitSize = peer.getSplitSize();
+ int partitioningSteps = computeMultiSteps(peer, splitSize);
+ long interval = splitSize / partitioningSteps;
+ // //////////////////////////////////
+
LOG.debug("vertex class: " + vertexClass);
boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
vertex.setPeer(peer);
vertex.runner = this;
+ long startPos = peer.getPos();
+ if (startPos == 0)
+ startPos = 1L;
+
KeyValuePair<Writable, Writable> next = null;
- int lines = 0;
+ int steps = 1;
while ((next = peer.readNext()) != null) {
boolean vertexFinished = reader.parseVertex(next.getKey(),
next.getValue(), vertex);
@@ -428,22 +438,26 @@ public final class GraphJobRunner<V exte
vertex.setPeer(peer);
vertex.runner = this;
- lines++;
- if ((lines % 100000) == 0) {
- peer.sync();
- GraphJobMessage msg = null;
- while ((msg = peer.getCurrentMessage()) != null) {
- Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
- messagedVertex.setPeer(peer);
- messagedVertex.runner = this;
- messagedVertex.setup(conf);
- vertices.put(messagedVertex.getVertexID(), messagedVertex);
+ if (runtimePartitioning) {
+ if (steps < partitioningSteps && (peer.getPos() - startPos) >=
interval) {
+ peer.sync();
+ steps++;
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+ messagedVertex.setPeer(peer);
+ messagedVertex.runner = this;
+ messagedVertex.setup(conf);
+ vertices.put(messagedVertex.getVertexID(), messagedVertex);
+ }
+ startPos = peer.getPos();
}
}
}
if (runtimePartitioning) {
peer.sync();
+
GraphJobMessage msg = null;
while ((msg = peer.getCurrentMessage()) != null) {
Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
@@ -453,8 +467,7 @@ public final class GraphJobRunner<V exte
vertices.put(messagedVertex.getVertexID(), messagedVertex);
}
}
-
- LOG.info("Loading finished at " + peer.getSuperstepCount() + " steps.");
+ LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
/*
* If the user want to repair the graph, it should traverse through that
@@ -465,14 +478,88 @@ public final class GraphJobRunner<V exte
*/
if (repairNeeded) {
LOG.debug("Starting repair of this graph!");
- final Collection<Vertex<V, E, M>> entries = vertices.values();
- for (Vertex<V, E, M> entry : entries) {
- List<Edge<V, E>> outEdges = entry.getEdges();
- for (Edge<V, E> e : outEdges) {
+
+ int multiSteps = 0;
+ MapWritable ssize = new MapWritable();
+ ssize
+ .put(new IntWritable(peer.getPeerIndex()), new
IntWritable(vertices.size()));
+ peer.send(masterTask, new GraphJobMessage(ssize));
+ ssize = null;
+ peer.sync();
+
+ if (this.isMasterTask(peer)) {
+ int minVerticesSize = Integer.MAX_VALUE;
+ GraphJobMessage received = null;
+ while ((received = peer.getCurrentMessage()) != null) {
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ int curr = ((IntWritable) e.getValue()).get();
+ if (minVerticesSize > curr) {
+ minVerticesSize = curr;
+ }
+ }
+ }
+
+ if(minVerticesSize < (partitioningSteps * 2)) {
+ multiSteps = minVerticesSize;
+ } else {
+ multiSteps = (partitioningSteps * 2);
+ }
+
+ for (String peerName : peer.getAllPeerNames()) {
+ MapWritable temp = new MapWritable();
+ temp.put(new Text("steps"), new IntWritable(multiSteps));
+ peer.send(peerName, new GraphJobMessage(temp));
+ }
+ }
+ peer.sync();
+
+ GraphJobMessage received = peer.getCurrentMessage();
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ multiSteps = ((IntWritable) e.getValue()).get();
+ }
+
+ Set<V> keys = vertices.keySet();
+ Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
+
+ int i = 0;
+ int syncs = 0;
+ for (V v : keys) {
+ for (Edge<V, E> e : vertices.get(v).getEdges()) {
peer.send(e.getDestinationPeerName(),
new GraphJobMessage(e.getDestinationVertexID()));
}
+
+ if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+ peer.sync();
+ syncs++;
+ GraphJobMessage msg = null;
+ while ((msg = peer.getCurrentMessage()) != null) {
+ V vertexName = (V) msg.getVertexId();
+ if (!vertices.containsKey(vertexName)) {
+ Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+ newVertex.setPeer(peer);
+ newVertex.setVertexID(vertexName);
+ newVertex.runner = this;
+ if (selfReference) {
+ int partition = partitioner.getPartition(
+ newVertex.getVertexID(), newVertex.getValue(),
+ peer.getNumPeers());
+ String target = peer.getPeerName(partition);
+ newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+ newVertex.getVertexID(), target, null)));
+ } else {
+ newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
+ }
+ newVertex.setup(conf);
+ tmp.put(vertexName, newVertex);
+ }
+ }
+ }
+ i++;
}
+
peer.sync();
GraphJobMessage msg = null;
while ((msg = peer.getCurrentMessage()) != null) {
@@ -493,10 +580,62 @@ public final class GraphJobRunner<V exte
}
newVertex.setup(conf);
vertices.put(vertexName, newVertex);
+ newVertex = null;
+ }
+ }
+
+ for(Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
+ vertices.put(e.getKey(), e.getValue());
+ }
+ tmp.clear();
+ }
+
+ LOG.debug("Starting Vertex processing!");
+ }
+
+ private int computeMultiSteps(
+ BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+ long splitSize) throws IOException, SyncException, InterruptedException {
+ int multiSteps = 1;
+
+ MapWritable ssize = new MapWritable();
+ ssize
+ .put(new IntWritable(peer.getPeerIndex()), new
LongWritable(splitSize));
+ peer.send(masterTask, new GraphJobMessage(ssize));
+ ssize = null;
+ peer.sync();
+
+ if (this.isMasterTask(peer)) {
+ long maxSplitSize = 0L;
+ GraphJobMessage received = null;
+ while ((received = peer.getCurrentMessage()) != null) {
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ long curr = ((LongWritable) e.getValue()).get();
+ if (maxSplitSize < curr) {
+ maxSplitSize = curr;
+ }
}
}
+
+ int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+ "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
+
+ for (String peerName : peer.getAllPeerNames()) {
+ MapWritable temp = new MapWritable();
+ temp.put(new Text("max"), new IntWritable(steps));
+ peer.send(peerName, new GraphJobMessage(temp));
+ }
}
+ peer.sync();
+ GraphJobMessage received = peer.getCurrentMessage();
+ MapWritable x = received.getMap();
+ for (Entry<Writable, Writable> e : x.entrySet()) {
+ multiSteps = ((IntWritable) e.getValue()).get();
+ }
+ LOG.info(peer.getPeerName() + ": " + multiSteps);
+ return multiSteps;
}
/**
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Thu Aug 30 06:19:50 2012
@@ -18,6 +18,7 @@
package org.apache.hama.graph;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -54,7 +55,10 @@ public class TestSubmitGraphJob extends
public void testSubmitJob() throws Exception {
generateTestData();
-
+
+ // Set multi-step partitioning interval to 30 bytes
+ configuration.setInt("hama.graph.multi.step.partitioning.interval", 30);
+
GraphJob bsp = new GraphJob(configuration, PageRank.class);
bsp.setInputPath(new Path(INPUT));
bsp.setOutputPath(new Path(OUTPUT));
@@ -127,6 +131,10 @@ public class TestSubmitGraphJob extends
if (bw != null) {
try {
bw.close();
+
+ File file = new File(INPUT);
+ LOG.info("Temp file length: " + file.length());
+
} catch (IOException e) {
e.printStackTrace();
}