Extending the API with a master.compute() function.

                 Key: GIRAPH-127
                 URL: https://issues.apache.org/jira/browse/GIRAPH-127
             Project: Giraph
          Issue Type: New Feature
          Components: bsp, examples, graph
            Reporter: Semih Salihoglu

First of all, sorry for the long explanation to this feature.

I want to expand the API of Giraph with a new function called master.compute(), 
that would get called at the master before each superstep and I will try to 
explain the purpose that it would serve with an example. Let's say we want to 
implement the following simplified version of the k-means clustering algorithm. 
Pseudocode below:
 * Input G(V, E), k, numEdgesThreshold, maxIterations
 * Algorithm:
 * int numEdgesCrossingClusters = Integer.MAX_INT;
*  int iterationNo = 0;
 * while ((numEdgesCrossingCluster > numEdgesThreshold) && iterationNo < 
maxIterations) {
 *    iterationNo++;
 *    int[] clusterCenters = pickKClusterCenters(k, G);
 *    findClusterCenters(G, clusterCenters);
 *    numEdgesCrossingClusters = countNumEdgesCrossingClusters();
 * }
The algorithm goes through the following steps in iterations:
1) Pick k random initial cluster centers
2) Assign each vertex to the cluster center that it's closest to (in Giraph, 
this can be implemented in message passing similar to how ShortestPaths is 
3) Count the nuimber of edges crossing clusters
4) Go back to step 1, if there are a lot of edges crossing clusters and we 
haven't exceeded maximum number of iterations yet.

In an algorithm like this, step 2 and 3 are where most of the work happens and 
both parts have very neat message-passing implementations. I'll try to give an 
overview without going into the details. Let's say we define a Vertex in Giraph 
to hold a custom Writable object that holds 2 integer values and sends a 
message with upto 2 integer values.
Step 2 is very similar to ShortestPaths algorithm and has two stages: In the 
first stage, each vertex checks to see whether or not it's one of the cluster 
centers. If so, it assigns itself the value (id, 0), otherwise it assigns 
itself (Null, Null). In the 2nd stage, the vertices assign themselves to the 
minimum distance cluster center by looking at their neighbors (cluster centers, 
distance) values (received as 2 integer messages) and their current values, and 
changing their values if they find a lower distance cluster center. This 
happens in x number of supersteps until every vertex converges.
Step 3, counting the number of edges crossing clusters, is also very easy to 
implement in Giraph. Once each vertex has a cluster center, the number of edges 
crossing clusters can be counted by an aggregator, let's say called 
"num-edges-crossing". It would again have two stages: First stage, every vertex 
just sends its cluster id to all its neighbors. Second stage, every vertex 
looks at their neighbors' cluster ids in the messages, and for each cluster id 
that is not equal to its own cluster id, it increments "num-edges-crossing" by 

The other 2 steps, step 1 and 4, are very simple sequential computations. Step 
1 just picks k random vertex ids and puts it into an aggregator. Step 4 just 
compares "num-edges-crossing" by a threshold and also checks whether or not the 
algorithm has exceeded maxIterations (not supersteps but iterations of going 
through Steps 1-4). With the current API, it's not clear where to do these 
computations. There is a per worker function preSuperstep() that can be 
implemented, but if we decide to pick a special worker, let's say worker 1,  to 
pick the k vertices then we'd waste an entire superstep where only worker 1 
would do work, (by picking k vertices  in preSuperstep() and put them into an 
aggregator), and all other workers would be idle. Trying to do this in worker 1 
in postSuperstep() would not work either because, worker 1 needs to know that 
all the vertices have converged to understand that it's time to pick k vertices 
or it's time do check in step 4, which would only be available to it in the 
beginning of the next superstep.

A master.compute() extension would run at the master and before the superstep 
and would modify the aggregator that would keep the k vertices before the 
aggregators are broadcast to the workers, which are all very short sequential 
computations, so they would not waste resources the way a preSuperstep() or 
postSuperstep() approach would do. It would also enable running new algorithms 
like kmeans that are composed of very vertex-centric computations glued 
together by small sequential ones. It would basically boost Giraph with 
sequential computation in a non-wasteful way.

I am a phd student at Stanford and I have been working on my own BSP/Pregel 
implementation since last year. It's called GPS. I haven't distributed it, 
mainly because in September I learned about Giraph and I decided to slow down 
on working on it :). We have basically been using GPS as our own research 
platform. The source code for GPS is here if any one is interested 
(https://subversion.assembla.com/svn/phd-projects/gps/trunk/). We have the 
master.compute() feature in GPS, and here's an example of KMeans implementation 
in GPS with master.compute(): 
 (Aggregators are called GlobalObjects in GPS). There is another example 
 which I'll skip explaining because it's very detailed and would make the 
similar points that I am trying to make with k-means. Master.compute() in 
general would make it possible to glue together any graph algorithm that is 
composed of multiple stages with different message types and computations that 
is conducive to run with vertex.compute(). There are many examples of such 
algorithms: recursive partitioning, triangle counting, even much simpler things 
like finding shortests paths for 100 vertices in pieces (first to 5 vertices, 
then to another 5, then to another 5, etc..), which would be good because 
trying to find shortests paths to 100 vertices require a very large messages 
(would need to store 100 integers per message)).

If the Giraph team approves, I would like to take a similar approach in 
implementing this feature in Giraph as I've done in GPS. Overall:
Add a Master.java to org.apache.giraph.graph, that is default Master, with a 
compute function that by default aggregates all aggregators and does the check 
of whether or not the computation has ended (by comparining numVertices with 
numFinishedVertices). This would be a refactoring of 
org.apache.giraph.graph.BspServiceMaster class (as far as I can see).
Extend GiraphJob to have a setMaster() method to set a master class (by default 
it would be the default master above)
The rest would be sending the custom master class to probably all workers but 
only the master would instantiate it with reflection. I need to learn more on 
how to do these, I am not familiar with that part of the Giraph code base yet.

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to