Author: jghoman
Date: Wed Aug 31 19:45:46 2011
New Revision: 1163764
URL: http://svn.apache.org/viewvc?rev=1163764&view=rev
Log:
GIRAPH-18. Refactor BspServiceWorker::loadVertices(). (jghoman)
Modified:
incubator/giraph/trunk/CHANGELOG
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Modified: incubator/giraph/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1163764&r1=1163763&r2=1163764&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Aug 31 19:45:46 2011
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.70.0 - unreleased
+
+ GIRAPH-18. Refactor BspServiceWorker::loadVertices(). (jghoman)
GIRAPH-14. Support for the Facebook Hadoop branch. (aching)
Modified:
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL:
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1163764&r1=1163763&r2=1163764&view=diff
==============================================================================
---
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
(original)
+++
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
Wed Aug 31 19:45:46 2011
@@ -292,205 +292,297 @@ public class BspServiceWorker<
private void loadVertices() throws IOException, ClassNotFoundException,
InterruptedException, InstantiationException,
IllegalAccessException {
- List<Vertex<I, V, E, M>> vertexList =
- new ArrayList<Vertex<I, V, E, M>>();
String inputSplitPath = null;
while ((inputSplitPath = reserveInputSplit()) != null) {
- // ZooKeeper has a limit of the data in a single znode of 1 MB and
- // each entry can go be on the average somewhat more than 300 bytes
- final long maxVertexRangesPerInputSplit =
- 1024 * 1024 / 350 / inputSplitCount;
+ Map<I, List<Long>> maxIndexStatMap =
+ loadVerticesFromInputSplit(inputSplitPath);
+ setInputSplitVertexRanges(inputSplitPath, maxIndexStatMap);
+ }
+ }
- byte[] splitList;
- try {
- splitList = getZkExt().getData(inputSplitPath, false, null);
- } catch (KeeperException e) {
- throw new IllegalStateException(
- "loadVertices: KeeperException on " + inputSplitPath, e);
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "loadVertices: IllegalStateException on " +
- inputSplitPath, e);
- }
- getContext().progress();
+ /**
+ * Extract vertices from input split, generating mapping of Indices to
+ * statistics about the vertices. As a side effect, load the vertices
+ * into local global map
+ *
+ * @param inputSplitPath ZK location of input split
+ * @return Mapping of vertex indices and statistics, or null if no data
read
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ */
+ private Map<I, List<Long>> loadVerticesFromInputSplit(String
inputSplitPath)
+ throws IOException, ClassNotFoundException, InterruptedException,
+ InstantiationException, IllegalAccessException {
+ InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
- DataInputStream inputStream =
- new DataInputStream(new ByteArrayInputStream(splitList));
- String inputSplitClass = Text.readString(inputStream);
- InputSplit inputSplit = (InputSplit)
- ReflectionUtils.newInstance(
- getConfiguration().getClassByName(inputSplitClass),
- getConfiguration());
- ((Writable) inputSplit).readFields(inputStream);
- if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Reserved " + inputSplitPath +
- " from ZooKeeper and got input split '" +
- inputSplit.toString() + "'");
- }
- VertexInputFormat<I, V, E> vertexInputFormat =
- BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
- VertexReader<I, V, E> vertexReader =
- vertexInputFormat.createVertexReader(inputSplit, getContext());
- vertexReader.initialize(inputSplit, getContext());
- vertexList.clear();
- Vertex<I, V, E, M> readerVertex =
- BspUtils.<I, V, E, M>createVertex(getConfiguration());
- while (vertexReader.next(readerVertex)) {
- if (readerVertex.getVertexId() == null) {
+ List<Vertex<I, V, E, M>> vertexList =
+ readVerticesFromInputSplit(inputSplit);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadVertices: Got " + vertexList.size() +
+ " vertices from input split " + inputSplit);
+ }
+
+ if (vertexList.isEmpty()) {
+ return null;
+ }
+
+ NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
+ getVertexRanges(inputSplit, vertexList);
+
+ Map<I, List<Long>> maxIndexStatMap =
getMaxIndexStatMap(vertexRangeMap);
+
+ return maxIndexStatMap;
+ }
+
+ /**
+ * Talk to ZooKeeper to convert the input split path to the actual
+ * InputSplit containing the vertices to read.
+ *
+ * @param inputSplitPath Location in ZK of input split
+ * @return instance of InputSplit containing vertices to read
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ private InputSplit getInputSplitForVertices(String inputSplitPath)
+ throws IOException, ClassNotFoundException {
+ byte[] splitList;
+ try {
+ splitList = getZkExt().getData(inputSplitPath, false, null);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "loadVertices: KeeperException on " + inputSplitPath, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "loadVertices: IllegalStateException on " + inputSplitPath, e);
+ }
+ getContext().progress();
+
+ DataInputStream inputStream =
+ new DataInputStream(new ByteArrayInputStream(splitList));
+ String inputSplitClass = Text.readString(inputStream);
+ InputSplit inputSplit = (InputSplit)
+ ReflectionUtils.newInstance(
+ getConfiguration().getClassByName(inputSplitClass),
+ getConfiguration());
+ ((Writable) inputSplit).readFields(inputStream);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadVertices: Reserved " + inputSplitPath +
+ " from ZooKeeper and got input split '" +
+ inputSplit.toString() + "'");
+ }
+ return inputSplit;
+ }
+
+ /**
+ * Read vertices from input split.
+ *
+ * @param inputSplit Input split to process with vertex reader
+ * @return List of vertices.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private List<Vertex<I, V, E, M>> readVerticesFromInputSplit(
+ InputSplit inputSplit) throws IOException, InterruptedException {
+ List<Vertex<I, V, E, M>> vertexList = new ArrayList<Vertex<I, V, E,
M>>();
+ VertexInputFormat<I, V, E> vertexInputFormat =
+ BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
+ VertexReader<I, V, E> vertexReader =
+ vertexInputFormat.createVertexReader(inputSplit, getContext());
+ vertexReader.initialize(inputSplit, getContext());
+ Vertex<I, V, E, M> readerVertex =
+ BspUtils.<I, V, E, M>createVertex(getConfiguration());
+
+ while (vertexReader.next(readerVertex)) {
+ if (readerVertex.getVertexId() == null) {
+ throw new IllegalArgumentException(
+ "loadVertices: Vertex reader returned a vertex " +
+ "without an id! - " + readerVertex);
+ }
+ if (readerVertex.getVertexValue() == null) {
+ readerVertex.setVertexValue(
+ BspUtils.<V>createVertexValue(getConfiguration()));
+ }
+ // Vertices must be ordered
+ if (!vertexList.isEmpty()) {
+ @SuppressWarnings("unchecked")
+ int compareTo =
+ vertexList.get(vertexList.size() - 1).
+ getVertexId().compareTo(readerVertex.getVertexId());
+ if (compareTo > 0) {
throw new IllegalArgumentException(
- "loadVertices: Vertex reader returned a vertex " +
- "without an id! - " + readerVertex);
- }
- if (readerVertex.getVertexValue() == null) {
- readerVertex.setVertexValue(
- BspUtils.<V>createVertexValue(getConfiguration()));
- }
- // Vertices must be ordered
- if (!vertexList.isEmpty()) {
- @SuppressWarnings("unchecked")
- int compareTo =
- vertexList.get(vertexList.size() - 1).
- getVertexId().compareTo(readerVertex.getVertexId());
- if (compareTo > 0) {
- throw new IllegalArgumentException(
- "loadVertices: Illegal out of order vertices " +
- "from vertex reader previous vertex = " +
- vertexList.get(vertexList.size() - 1) +
- ", next vertex = " + readerVertex);
- }
+ "loadVertices: Illegal out of order vertices " +
+ "from vertex reader previous vertex = " +
+ vertexList.get(vertexList.size() - 1) +
+ ", next vertex = " + readerVertex);
}
- vertexList.add(readerVertex);
- readerVertex =
- BspUtils.<I, V, E, M>createVertex(getConfiguration());
- getContext().progress();
- }
- vertexReader.close();
- if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Got " + vertexList.size() +
- " vertices from input split " + inputSplit);
- }
- if (vertexList.isEmpty()) {
- setInputSplitVertexRanges(inputSplitPath, null);
- continue;
}
+ vertexList.add(readerVertex);
+ readerVertex = BspUtils.<I, V, E,
M>createVertex(getConfiguration());
+ getContext().progress();
+ }
+ vertexReader.close();
- // Separate all the vertices in this InputSplit into vertex ranges.
- // The number of vertex ranges is up to half of the number of
- // available workers and must reach a minimum size. Then two
passes
- // over the vertexList. First, find the maximum vertex ranges.
- // Then fill them in.
- NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
- new TreeMap<I, VertexRange<I, V, E, M>>();
- long vertexRangesPerInputSplit = (long) (inputSplitCount *
- getConfiguration().getFloat(
- GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
- GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT));
- if (vertexRangesPerInputSplit == 0) {
- vertexRangesPerInputSplit = 1;
- }
- else if (vertexRangesPerInputSplit > maxVertexRangesPerInputSplit)
{
- LOG.warn("loadVertices: Using " + maxVertexRangesPerInputSplit
+
- " instead of " + vertexRangesPerInputSplit +
- " vertex ranges on input split " + inputSplit);
- vertexRangesPerInputSplit = maxVertexRangesPerInputSplit;
- }
-
- long vertexRangeSize =
- vertexList.size() / vertexRangesPerInputSplit;
- long minPerVertexRange =
- getConfiguration().getLong(
- GiraphJob.MIN_VERTICES_PER_RANGE,
- GiraphJob.MIN_VERTICES_PER_RANGE_DEFAULT);
- if (vertexRangeSize < minPerVertexRange) {
- vertexRangeSize = minPerVertexRange;
- }
- I vertexIdMax = null;
- for (int i = 0; i < vertexList.size(); ++i) {
- if ((vertexIdMax != null) && ((i % vertexRangeSize) == 0)) {
- VertexRange<I, V, E, M> vertexRange =
- new VertexRange<I, V, E, M>(
- null, -1, null, vertexIdMax, null);
- vertexRangeMap.put(vertexIdMax, vertexRange);
- vertexIdMax = null;
- }
+ return vertexList;
+ }
- if (vertexIdMax == null) {
- vertexIdMax = vertexList.get(i).getVertexId();
- } else {
- @SuppressWarnings("unchecked")
- int compareTo =
- vertexList.get(i).getVertexId().compareTo(vertexIdMax);
- if (compareTo > 0) {
- vertexIdMax = vertexList.get(i).getVertexId();
- }
- }
+ /**
+ * Partition the vertices into their respective VertexRanges. The number
of
+ * vertex ranges may be up to half of the number of available workers and
+ * must reach a minimum size.
+ *
+ * @param inputSplit inputSplit for these vertices
+ * @param vertexList vertices from which to build the ranges
+ * @return Ordered map of maxi vertex ID in range <-> vertices within range
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws IOException
+ */
+ private NavigableMap<I, VertexRange<I, V, E, M>> getVertexRanges(
+ InputSplit inputSplit, List<Vertex<I, V, E, M>> vertexList)
+ throws InstantiationException, IllegalAccessException, IOException {
+
+ NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
+ new TreeMap<I, VertexRange<I, V, E, M>>();
+ long vertexRangesPerInputSplit =
getVertexRangesPerInputSplit(inputSplit);
+ long vertexRangeSize = vertexList.size() / vertexRangesPerInputSplit;
+ long minPerVertexRange =
+ getConfiguration().getLong(
+ GiraphJob.MIN_VERTICES_PER_RANGE,
+ GiraphJob.MIN_VERTICES_PER_RANGE_DEFAULT);
+ if (vertexRangeSize < minPerVertexRange) {
+ vertexRangeSize = minPerVertexRange;
+ }
+ I vertexIdMax = null;
+
+ // Identify the endpoints of the ranges, create empty placeholders
+ for (int i = 0; i < vertexList.size(); ++i) {
+ if ((vertexIdMax != null) && ((i % vertexRangeSize) == 0)) {
+ VertexRange<I, V, E, M> vertexRange =
+ new VertexRange<I, V, E, M>(
+ null, -1, null, vertexIdMax, null);
+ vertexRangeMap.put(vertexIdMax, vertexRange);
+ vertexIdMax = null;
}
+
if (vertexIdMax == null) {
- throw new RuntimeException("loadVertices: Encountered " +
- "impossible null vertexIdMax.");
- }
- VertexRange<I, V, E, M> vertexRange =
- new VertexRange<I, V, E, M>(
- null, -1, null, vertexIdMax, null);
- vertexRangeMap.put(vertexIdMax, vertexRange);
-
- Iterator<I> maxIndexVertexMapIt =
- vertexRangeMap.keySet().iterator();
- I currentVertexIndexMax = maxIndexVertexMapIt.next();
- for (Vertex<I, V, E, M> vertex : vertexList) {
+ vertexIdMax = vertexList.get(i).getVertexId();
+ } else {
@SuppressWarnings("unchecked")
int compareTo =
- vertex.getVertexId().compareTo(
- currentVertexIndexMax);
+ vertexList.get(i).getVertexId().compareTo(vertexIdMax);
if (compareTo > 0) {
- if (!maxIndexVertexMapIt.hasNext()) {
- throw new RuntimeException(
- "loadVertices: Impossible that vertex " +
- vertex.getVertexId() + " > " +
- currentVertexIndexMax);
- }
- currentVertexIndexMax = maxIndexVertexMapIt.next();
+ vertexIdMax = vertexList.get(i).getVertexId();
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadVertices: Adding vertex with index = " +
- vertex.getVertexId() + " to vertex range max = "
+
- currentVertexIndexMax);
- }
- if (vertexRangeMap.get(currentVertexIndexMax).
- getVertexMap().put(vertex.getVertexId(),
- vertex) != null) {
- throw new IllegalStateException(
- "loadVertices: Already contains vertex " +
- vertex.toString() + " in vertex range max " +
+ }
+ }
+ if (vertexIdMax == null) {
+ throw new RuntimeException("loadVertices: Encountered " +
+ "impossible null vertexIdMax.");
+ }
+ VertexRange<I, V, E, M> finalRange =
+ new VertexRange<I, V, E, M>(null, -1, null, vertexIdMax, null);
+ vertexRangeMap.put(vertexIdMax, finalRange);
+
+ // Now iterate over the defined ranges, placing each vertex in its
range
+ Iterator<I> maxIndexVertexMapIt = vertexRangeMap.keySet().iterator();
+ I currentVertexIndexMax = maxIndexVertexMapIt.next();
+ for (Vertex<I, V, E, M> vertex : vertexList) {
+ @SuppressWarnings("unchecked")
+ int compareTo =
vertex.getVertexId().compareTo(currentVertexIndexMax);
+ if (compareTo > 0) {
+ if (!maxIndexVertexMapIt.hasNext()) {
+ throw new RuntimeException(
+ "loadVertices: Impossible that vertex " +
+ vertex.getVertexId() + " > " +
currentVertexIndexMax);
}
+ currentVertexIndexMax = maxIndexVertexMapIt.next();
}
- Map<I, List<Long>> maxIndexStatMap = new TreeMap<I, List<Long>>();
- for (Entry<I, VertexRange<I, V, E, M>> entry :
- vertexRangeMap.entrySet()) {
- List<Long> statList = new ArrayList<Long>();
- long vertexRangeEdgeCount = 0;
- for (BasicVertex<I, V, E, M> vertex :
- entry.getValue().getVertexMap().values()) {
- vertexRangeEdgeCount += vertex.getOutEdgeMap().size();
- }
- statList.add(Long.valueOf
- (entry.getValue().getVertexMap().size()));
- statList.add(Long.valueOf(vertexRangeEdgeCount));
- if (LOG.isInfoEnabled()) {
- LOG.info("loadVertices: Got " + statList.get(0) +
- " vertices and " + statList.get(1) +
- " edges from vertex range max index " +
- entry.getKey());
- }
- maxIndexStatMap.put(entry.getKey(), statList);
-
- // Add the local vertex ranges to the stored vertex ranges
- getStorableVertexRangeMap().put(entry.getKey(),
- entry.getValue());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("loadVertices: Adding vertex with index = " +
+ vertex.getVertexId() + " to vertex range max = " +
+ currentVertexIndexMax);
+ }
+ VertexRange<I, V, E, M> range =
+ vertexRangeMap.get(currentVertexIndexMax);
+ SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
+ range.getVertexMap();
+ if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
+ throw new IllegalStateException(
+ "loadVertices: Already contains vertex " +
+ vertex.toString() + " in vertex range max " +
+ currentVertexIndexMax);
}
- setInputSplitVertexRanges(inputSplitPath, maxIndexStatMap);
}
+ return vertexRangeMap;
+ }
+
+ /**
+ * Determine the number of VertexRanges per InputSplit.
+ *
+ * @param inputSplit inputSplit to generate number for
+ * @return number of VertexRanges per split
+ */
+ private long getVertexRangesPerInputSplit(InputSplit inputSplit) {
+ // ZooKeeper has a limit of the data in a single znode of 1 MB and
+ // each entry can go be on the average somewhat more than 300 bytes
+ final long
+ maxVertexRangesPerInputSplit = 1024 * 1024 / 350 / inputSplitCount;
+
+ long vertexRangesPerInputSplit = (long) (inputSplitCount *
+ getConfiguration().getFloat(
+ GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
+ GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT));
+ if (vertexRangesPerInputSplit == 0) {
+ vertexRangesPerInputSplit = 1;
+ } else if (vertexRangesPerInputSplit > maxVertexRangesPerInputSplit) {
+ LOG.warn("loadVertices: Using " + maxVertexRangesPerInputSplit +
+ " instead of " + vertexRangesPerInputSplit +
+ " vertex ranges on input split " + inputSplit);
+ vertexRangesPerInputSplit = maxVertexRangesPerInputSplit;
+ }
+ return vertexRangesPerInputSplit;
+ }
+
+ /**
+ * Generate a mapping of max vertex indices to two-element lists consisting
+ * of the number of vertices and the number of edges in each partition.
+ * Additionally, adds entries of the vertexRangeMap to local map.
+ *
+ * @param vertexRangeMap Mapping of indices to VertexRanges
+ * @return mapping of vertices to stats, or null if no vertices
+ * are written for the partition
+ */
+ private Map<I, List<Long>> getMaxIndexStatMap(
+ NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap) {
+ Map<I, List<Long>> maxIndexStatMap = new TreeMap<I, List<Long>>();
+
+ for (Entry<I, VertexRange<I, V, E, M>> entry :
+ vertexRangeMap.entrySet()) {
+ List<Long> statList = new ArrayList<Long>(2);
+ long vertexRangeEdgeCount = 0;
+ for (BasicVertex<I, V, E, M> vertex :
+ entry.getValue().getVertexMap().values()) {
+ vertexRangeEdgeCount += vertex.getOutEdgeMap().size();
+ }
+ statList.add(Long.valueOf(entry.getValue().getVertexMap().size()));
+ statList.add(Long.valueOf(vertexRangeEdgeCount));
+ if (LOG.isInfoEnabled()) {
+ LOG.info("loadVertices: Got " + statList.get(0) +
+ " vertices and " + statList.get(1) +
+ " edges from vertex range max index " + entry.getKey());
+ }
+ maxIndexStatMap.put(entry.getKey(), statList);
+
+ // Add the local vertex ranges to the stored vertex ranges
+ getStorableVertexRangeMap().put(entry.getKey(), entry.getValue());
+ }
+ return maxIndexStatMap;
}
@Override