Author: jghoman
Date: Wed Aug 31 19:45:46 2011
New Revision: 1163764

URL: http://svn.apache.org/viewvc?rev=1163764&view=rev
Log:
GIRAPH-18. Refactor BspServiceWorker::loadVertices(). (jghoman)

Modified:
    incubator/giraph/trunk/CHANGELOG
    
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java

Modified: incubator/giraph/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/CHANGELOG?rev=1163764&r1=1163763&r2=1163764&view=diff
==============================================================================
--- incubator/giraph/trunk/CHANGELOG (original)
+++ incubator/giraph/trunk/CHANGELOG Wed Aug 31 19:45:46 2011
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.70.0 - unreleased
+
+  GIRAPH-18. Refactor BspServiceWorker::loadVertices(). (jghoman)
   
   GIRAPH-14. Support for the Facebook Hadoop branch. (aching)
 

Modified: 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: 
http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1163764&r1=1163763&r2=1163764&view=diff
==============================================================================
--- 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 (original)
+++ 
incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 Wed Aug 31 19:45:46 2011
@@ -292,205 +292,297 @@ public class BspServiceWorker<
     private void loadVertices() throws IOException, ClassNotFoundException,
             InterruptedException, InstantiationException,
             IllegalAccessException {
-        List<Vertex<I, V, E, M>> vertexList =
-            new ArrayList<Vertex<I, V, E, M>>();
         String inputSplitPath = null;
         while ((inputSplitPath = reserveInputSplit()) != null) {
-            // ZooKeeper has a limit of the data in a single znode of 1 MB and
-            // each entry can go be on the average somewhat more than 300 bytes
-            final long maxVertexRangesPerInputSplit =
-                1024 * 1024 / 350 / inputSplitCount;
+            Map<I, List<Long>> maxIndexStatMap =
+                loadVerticesFromInputSplit(inputSplitPath);
+            setInputSplitVertexRanges(inputSplitPath, maxIndexStatMap);
+        }
+    }
 
-            byte[] splitList;
-            try {
-                splitList = getZkExt().getData(inputSplitPath, false, null);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "loadVertices: KeeperException on " + inputSplitPath, e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "loadVertices: IllegalStateException on " +
-                    inputSplitPath, e);
-            }
-            getContext().progress();
+    /**
+     * Extract vertices from input split, generating mapping of Indices to
+     * statistics about the vertices. As a side effect, load the vertices
+     * into local global map
+     *
+     * @param inputSplitPath ZK location of input split
+     * @return Mapping of vertex indices and statistics, or null if no data 
read
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     */
+    private Map<I, List<Long>> loadVerticesFromInputSplit(String 
inputSplitPath)
+        throws IOException, ClassNotFoundException, InterruptedException,
+               InstantiationException, IllegalAccessException {
+        InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
 
-            DataInputStream inputStream =
-                new DataInputStream(new ByteArrayInputStream(splitList));
-            String inputSplitClass = Text.readString(inputStream);
-            InputSplit inputSplit = (InputSplit)
-                ReflectionUtils.newInstance(
-                    getConfiguration().getClassByName(inputSplitClass),
-                    getConfiguration());
-            ((Writable) inputSplit).readFields(inputStream);
-            if (LOG.isInfoEnabled()) {
-                LOG.info("loadVertices: Reserved " + inputSplitPath +
-                         " from ZooKeeper and got input split '" +
-                         inputSplit.toString() + "'");
-            }
-            VertexInputFormat<I, V, E> vertexInputFormat =
-                BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
-            VertexReader<I, V, E> vertexReader =
-                vertexInputFormat.createVertexReader(inputSplit, getContext());
-            vertexReader.initialize(inputSplit, getContext());
-            vertexList.clear();
-            Vertex<I, V, E, M> readerVertex =
-                BspUtils.<I, V, E, M>createVertex(getConfiguration());
-            while (vertexReader.next(readerVertex)) {
-                if (readerVertex.getVertexId() == null) {
+        List<Vertex<I, V, E, M>> vertexList =
+            readVerticesFromInputSplit(inputSplit);
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("loadVertices: Got " + vertexList.size() +
+                 " vertices from input split " + inputSplit);
+        }
+
+        if (vertexList.isEmpty()) {
+            return null;
+        }
+
+        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
+            getVertexRanges(inputSplit, vertexList);
+
+        Map<I, List<Long>> maxIndexStatMap = 
getMaxIndexStatMap(vertexRangeMap);
+
+        return maxIndexStatMap;
+    }
+
+    /**
+     * Talk to ZooKeeper to convert the input split path to the actual
+     * InputSplit containing the vertices to read.
+     *
+     * @param inputSplitPath Location in ZK of input split
+     * @return instance of InputSplit containing vertices to read
+     * @throws IOException
+     * @throws ClassNotFoundException
+     */
+    private InputSplit getInputSplitForVertices(String inputSplitPath)
+        throws IOException, ClassNotFoundException {
+        byte[] splitList;
+        try {
+            splitList = getZkExt().getData(inputSplitPath, false, null);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+               "loadVertices: KeeperException on " + inputSplitPath, e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "loadVertices: IllegalStateException on " + inputSplitPath, e);
+        }
+        getContext().progress();
+
+        DataInputStream inputStream =
+            new DataInputStream(new ByteArrayInputStream(splitList));
+        String inputSplitClass = Text.readString(inputStream);
+        InputSplit inputSplit = (InputSplit)
+            ReflectionUtils.newInstance(
+                getConfiguration().getClassByName(inputSplitClass),
+                getConfiguration());
+        ((Writable) inputSplit).readFields(inputStream);
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("loadVertices: Reserved " + inputSplitPath +
+                 " from ZooKeeper and got input split '" +
+                 inputSplit.toString() + "'");
+        }
+        return inputSplit;
+    }
+
+    /**
+     * Read vertices from input split.
+     *
+     * @param inputSplit Input split to process with vertex reader
+     * @return List of vertices.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private List<Vertex<I, V, E, M>> readVerticesFromInputSplit(
+        InputSplit inputSplit) throws IOException, InterruptedException {
+        List<Vertex<I, V, E, M>> vertexList = new ArrayList<Vertex<I, V, E, 
M>>();
+        VertexInputFormat<I, V, E> vertexInputFormat =
+            BspUtils.<I, V, E>createVertexInputFormat(getConfiguration());
+        VertexReader<I, V, E> vertexReader =
+            vertexInputFormat.createVertexReader(inputSplit, getContext());
+        vertexReader.initialize(inputSplit, getContext());
+        Vertex<I, V, E, M> readerVertex =
+            BspUtils.<I, V, E, M>createVertex(getConfiguration());
+
+        while (vertexReader.next(readerVertex)) {
+            if (readerVertex.getVertexId() == null) {
+                throw new IllegalArgumentException(
+                    "loadVertices: Vertex reader returned a vertex " +
+                    "without an id!  - " + readerVertex);
+            }
+            if (readerVertex.getVertexValue() == null) {
+                readerVertex.setVertexValue(
+                    BspUtils.<V>createVertexValue(getConfiguration()));
+            }
+            // Vertices must be ordered
+            if (!vertexList.isEmpty()) {
+                @SuppressWarnings("unchecked")
+                int compareTo =
+                    vertexList.get(vertexList.size() - 1).
+                    getVertexId().compareTo(readerVertex.getVertexId());
+                if (compareTo > 0) {
                     throw new IllegalArgumentException(
-                        "loadVertices: Vertex reader returned a vertex " +
-                        "without an id!  - " + readerVertex);
-                }
-                if (readerVertex.getVertexValue() == null) {
-                    readerVertex.setVertexValue(
-                        BspUtils.<V>createVertexValue(getConfiguration()));
-                }
-                // Vertices must be ordered
-                if (!vertexList.isEmpty()) {
-                    @SuppressWarnings("unchecked")
-                    int compareTo =
-                        vertexList.get(vertexList.size() - 1).
-                        getVertexId().compareTo(readerVertex.getVertexId());
-                    if (compareTo > 0) {
-                        throw new IllegalArgumentException(
-                            "loadVertices: Illegal out of order vertices " +
-                            "from vertex reader previous vertex = " +
-                            vertexList.get(vertexList.size() - 1) +
-                            ", next vertex = " + readerVertex);
-                    }
+                        "loadVertices: Illegal out of order vertices " +
+                        "from vertex reader previous vertex = " +
+                        vertexList.get(vertexList.size() - 1) +
+                        ", next vertex = " + readerVertex);
                 }
-                vertexList.add(readerVertex);
-                readerVertex =
-                    BspUtils.<I, V, E, M>createVertex(getConfiguration());
-                getContext().progress();
-            }
-            vertexReader.close();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("loadVertices: Got " + vertexList.size() +
-                         " vertices from input split " + inputSplit);
-            }
-            if (vertexList.isEmpty()) {
-                setInputSplitVertexRanges(inputSplitPath, null);
-                continue;
             }
+            vertexList.add(readerVertex);
+            readerVertex = BspUtils.<I, V, E, 
M>createVertex(getConfiguration());
+            getContext().progress();
+        }
+        vertexReader.close();
 
-            // Separate all the vertices in this InputSplit into vertex ranges.
-            // The number of vertex ranges is up to half of the number of
-            // available workers and must reach a minimum size.  Then two 
passes
-            // over the vertexList.  First, find the maximum vertex ranges.
-            // Then fill them in.
-            NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-                new TreeMap<I, VertexRange<I, V, E, M>>();
-            long vertexRangesPerInputSplit = (long) (inputSplitCount *
-                getConfiguration().getFloat(
-                    GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
-                    GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT));
-            if (vertexRangesPerInputSplit == 0) {
-                vertexRangesPerInputSplit = 1;
-            }
-            else if (vertexRangesPerInputSplit > maxVertexRangesPerInputSplit) 
{
-                LOG.warn("loadVertices: Using " + maxVertexRangesPerInputSplit 
+
-                         " instead of " + vertexRangesPerInputSplit +
-                         " vertex ranges on input split " + inputSplit);
-                vertexRangesPerInputSplit = maxVertexRangesPerInputSplit;
-            }
-
-            long vertexRangeSize =
-                vertexList.size() / vertexRangesPerInputSplit;
-            long minPerVertexRange =
-                getConfiguration().getLong(
-                    GiraphJob.MIN_VERTICES_PER_RANGE,
-                    GiraphJob.MIN_VERTICES_PER_RANGE_DEFAULT);
-            if (vertexRangeSize < minPerVertexRange) {
-                vertexRangeSize = minPerVertexRange;
-            }
-            I vertexIdMax = null;
-            for (int i = 0; i < vertexList.size(); ++i) {
-                if ((vertexIdMax != null) && ((i % vertexRangeSize) == 0)) {
-                    VertexRange<I, V, E, M> vertexRange =
-                        new VertexRange<I, V, E, M>(
-                            null, -1, null, vertexIdMax, null);
-                    vertexRangeMap.put(vertexIdMax, vertexRange);
-                    vertexIdMax = null;
-                }
+        return vertexList;
+    }
 
-                if (vertexIdMax == null) {
-                    vertexIdMax = vertexList.get(i).getVertexId();
-                } else {
-                    @SuppressWarnings("unchecked")
-                    int compareTo =
-                        vertexList.get(i).getVertexId().compareTo(vertexIdMax);
-                    if (compareTo > 0) {
-                        vertexIdMax = vertexList.get(i).getVertexId();
-                    }
-                }
+    /**
+     * Partition the vertices into their respective VertexRanges.  The number 
of
+     * vertex ranges may be up to half of the number of available workers and
+     * must reach a minimum size.
+     *
+     * @param inputSplit inputSplit for these vertices
+     * @param vertexList vertices from which to build the ranges
+     * @return Ordered map of maxi vertex ID in range <-> vertices within range
+     * @throws InstantiationException
+     * @throws IllegalAccessException
+     * @throws IOException
+     */
+    private NavigableMap<I, VertexRange<I, V, E, M>> getVertexRanges(
+        InputSplit inputSplit, List<Vertex<I, V, E, M>> vertexList)
+        throws InstantiationException, IllegalAccessException, IOException {
+
+        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
+            new TreeMap<I, VertexRange<I, V, E, M>>();
+        long vertexRangesPerInputSplit = 
getVertexRangesPerInputSplit(inputSplit);
+        long vertexRangeSize = vertexList.size() / vertexRangesPerInputSplit;
+        long minPerVertexRange =
+            getConfiguration().getLong(
+                GiraphJob.MIN_VERTICES_PER_RANGE,
+                GiraphJob.MIN_VERTICES_PER_RANGE_DEFAULT);
+        if (vertexRangeSize < minPerVertexRange) {
+            vertexRangeSize = minPerVertexRange;
+        }
+        I vertexIdMax = null;
+
+        // Identify the endpoints of the ranges, create empty placeholders
+        for (int i = 0; i < vertexList.size(); ++i) {
+            if ((vertexIdMax != null) && ((i % vertexRangeSize) == 0)) {
+                VertexRange<I, V, E, M> vertexRange =
+                    new VertexRange<I, V, E, M>(
+                        null, -1, null, vertexIdMax, null);
+                vertexRangeMap.put(vertexIdMax, vertexRange);
+                vertexIdMax = null;
             }
+
             if (vertexIdMax == null) {
-                throw new RuntimeException("loadVertices: Encountered " +
-                                           "impossible null vertexIdMax.");
-            }
-            VertexRange<I, V, E, M> vertexRange =
-                new VertexRange<I, V, E, M>(
-                    null, -1, null, vertexIdMax, null);
-            vertexRangeMap.put(vertexIdMax, vertexRange);
-
-            Iterator<I> maxIndexVertexMapIt =
-                vertexRangeMap.keySet().iterator();
-            I currentVertexIndexMax = maxIndexVertexMapIt.next();
-            for (Vertex<I, V, E, M> vertex : vertexList) {
+                vertexIdMax = vertexList.get(i).getVertexId();
+            } else {
                 @SuppressWarnings("unchecked")
                 int compareTo =
-                    vertex.getVertexId().compareTo(
-                        currentVertexIndexMax);
+                    vertexList.get(i).getVertexId().compareTo(vertexIdMax);
                 if (compareTo > 0) {
-                    if (!maxIndexVertexMapIt.hasNext()) {
-                        throw new RuntimeException(
-                            "loadVertices: Impossible that vertex " +
-                            vertex.getVertexId() + " > " +
-                            currentVertexIndexMax);
-                    }
-                    currentVertexIndexMax = maxIndexVertexMapIt.next();
+                    vertexIdMax = vertexList.get(i).getVertexId();
                 }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("loadVertices: Adding vertex with index = " +
-                              vertex.getVertexId() + " to vertex range max = " 
+
-                              currentVertexIndexMax);
-                }
-                if (vertexRangeMap.get(currentVertexIndexMax).
-                        getVertexMap().put(vertex.getVertexId(),
-                                           vertex) != null) {
-                    throw new IllegalStateException(
-                        "loadVertices: Already contains vertex " +
-                        vertex.toString() + " in vertex range max " +
+            }
+        }
+        if (vertexIdMax == null) {
+            throw new RuntimeException("loadVertices: Encountered " +
+                                       "impossible null vertexIdMax.");
+        }
+        VertexRange<I, V, E, M> finalRange =
+            new VertexRange<I, V, E, M>(null, -1, null, vertexIdMax, null);
+        vertexRangeMap.put(vertexIdMax, finalRange);
+
+        // Now iterate over the defined ranges, placing each vertex in its 
range
+        Iterator<I> maxIndexVertexMapIt = vertexRangeMap.keySet().iterator();
+        I currentVertexIndexMax = maxIndexVertexMapIt.next();
+        for (Vertex<I, V, E, M> vertex : vertexList) {
+            @SuppressWarnings("unchecked")
+            int compareTo = 
vertex.getVertexId().compareTo(currentVertexIndexMax);
+            if (compareTo > 0) {
+                if (!maxIndexVertexMapIt.hasNext()) {
+                    throw new RuntimeException(
+                        "loadVertices: Impossible that vertex " +
+                        vertex.getVertexId() + " > " +
                         currentVertexIndexMax);
                 }
+                currentVertexIndexMax = maxIndexVertexMapIt.next();
             }
-            Map<I, List<Long>> maxIndexStatMap = new TreeMap<I, List<Long>>();
-            for (Entry<I, VertexRange<I, V, E, M>> entry :
-                    vertexRangeMap.entrySet()) {
-                List<Long> statList = new ArrayList<Long>();
-                long vertexRangeEdgeCount = 0;
-                for (BasicVertex<I, V, E, M> vertex :
-                        entry.getValue().getVertexMap().values()) {
-                    vertexRangeEdgeCount += vertex.getOutEdgeMap().size();
-                }
-                statList.add(Long.valueOf
-                                 (entry.getValue().getVertexMap().size()));
-                statList.add(Long.valueOf(vertexRangeEdgeCount));
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("loadVertices: Got " + statList.get(0) +
-                             " vertices and " + statList.get(1) +
-                             " edges from vertex range max index " +
-                             entry.getKey());
-                }
-                maxIndexStatMap.put(entry.getKey(), statList);
-
-                // Add the local vertex ranges to the stored vertex ranges
-                getStorableVertexRangeMap().put(entry.getKey(),
-                                                entry.getValue());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("loadVertices: Adding vertex with index = " +
+                          vertex.getVertexId() + " to vertex range max = " +
+                          currentVertexIndexMax);
+            }
+            VertexRange<I, V, E, M> range =
+                vertexRangeMap.get(currentVertexIndexMax);
+            SortedMap<I, BasicVertex<I, V, E, M>> vertexMap =
+                range.getVertexMap();
+            if (vertexMap.put(vertex.getVertexId(), vertex) != null) {
+                throw new IllegalStateException(
+                    "loadVertices: Already contains vertex " +
+                    vertex.toString() + " in vertex range max " +
+                    currentVertexIndexMax);
             }
-            setInputSplitVertexRanges(inputSplitPath, maxIndexStatMap);
         }
+        return vertexRangeMap;
+    }
+
+    /**
+     * Determine the number of VertexRanges per InputSplit.
+     *
+     * @param inputSplit inputSplit to generate number for
+     * @return number of VertexRanges per split
+     */
+    private long getVertexRangesPerInputSplit(InputSplit inputSplit) {
+        // ZooKeeper has a limit of the data in a single znode of 1 MB and
+        // each entry can go be on the average somewhat more than 300 bytes
+        final long
+            maxVertexRangesPerInputSplit = 1024 * 1024 / 350 / inputSplitCount;
+
+        long vertexRangesPerInputSplit = (long) (inputSplitCount *
+            getConfiguration().getFloat(
+                GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER,
+                GiraphJob.TOTAL_INPUT_SPLIT_MULTIPLIER_DEFAULT));
+        if (vertexRangesPerInputSplit == 0) {
+            vertexRangesPerInputSplit = 1;
+        } else if (vertexRangesPerInputSplit > maxVertexRangesPerInputSplit) {
+            LOG.warn("loadVertices: Using " + maxVertexRangesPerInputSplit +
+                     " instead of " + vertexRangesPerInputSplit +
+                     " vertex ranges on input split " + inputSplit);
+            vertexRangesPerInputSplit = maxVertexRangesPerInputSplit;
+        }
+        return vertexRangesPerInputSplit;
+    }
+
+    /**
+     * Generate a mapping of max vertex indices to two-element lists consisting
+     * of the number of vertices and the number of edges in each partition.
+     * Additionally, adds entries of the vertexRangeMap to local map.
+     *
+     * @param vertexRangeMap Mapping of indices to VertexRanges
+     * @return  mapping of vertices to stats, or null if no vertices
+     *          are written for the partition
+     */
+    private Map<I, List<Long>> getMaxIndexStatMap(
+        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap) {
+        Map<I, List<Long>> maxIndexStatMap = new TreeMap<I, List<Long>>();
+
+        for (Entry<I, VertexRange<I, V, E, M>> entry :
+                 vertexRangeMap.entrySet()) {
+            List<Long> statList = new ArrayList<Long>(2);
+            long vertexRangeEdgeCount = 0;
+            for (BasicVertex<I, V, E, M> vertex :
+                entry.getValue().getVertexMap().values()) {
+                vertexRangeEdgeCount += vertex.getOutEdgeMap().size();
+            }
+            statList.add(Long.valueOf(entry.getValue().getVertexMap().size()));
+            statList.add(Long.valueOf(vertexRangeEdgeCount));
+            if (LOG.isInfoEnabled()) {
+                LOG.info("loadVertices: Got " + statList.get(0) +
+                     " vertices and " + statList.get(1) +
+                     " edges from vertex range max index " + entry.getKey());
+            }
+            maxIndexStatMap.put(entry.getKey(), statList);
+
+            // Add the local vertex ranges to the stored vertex ranges
+            getStorableVertexRangeMap().put(entry.getKey(), entry.getValue());
+        }
+        return maxIndexStatMap;
     }
 
     @Override


Reply via email to