Author: edwardyoon
Date: Wed Mar  5 11:35:01 2014
New Revision: 1574446

URL: http://svn.apache.org/r1574446
Log:
HAMA-884: Add Combiners and Aggregators API guide to website

Modified:
    hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
    hama/trunk/src/site/xdoc/hama_graph_tutorial.xml

Modified: hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml
URL: 
http://svn.apache.org/viewvc/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml?rev=1574446&r1=1574445&r2=1574446&view=diff
==============================================================================
--- hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml (original)
+++ hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Wed Mar  5 11:35:01 2014
@@ -149,7 +149,7 @@ xsi:schemaLocation="http://maven.apache.
 
     peer.sync();
   }</pre>
-    
+  
     <h4>Synchronization</h4>
 
     <p>When all the processes have entered the barrier via the sync() method, 
@@ -173,7 +173,44 @@ xsi:schemaLocation="http://maven.apache.
     }
   }</pre>
   
-  
+    <h4>Combiners</h4>
+    <p>Combiners are used for performing message aggregation to reduce 
communication overhead in cases when messages can be summarized arithmetically 
e.g., min, max, sum, and average at the sender side. Suppose that you want to 
send the integer messages to a specific processor from 0 to 1000 and sum all 
received the integer messages from all processors:</p>
+    <pre>
+    public void bsp(BSPPeer&lt;NullWritable, NullWritable, NullWritable, 
NullWritable&gt; peer) throws IOException,
+        SyncException, InterruptedException {
+
+      for (int i = 0; i &lt; 1000; i++) {
+        peer.send(masterTask, new IntegerMessage(peer.getPeerName(), i));
+      }
+      peer.sync();
+
+      if (peer.getPeerName().equals(masterTask)) {
+        IntegerMessage received;
+        while ((received = (IntegerMessage) peer.getCurrentMessage()) != null) 
{
+          sum += received.getData();
+        }
+      }
+    }</pre>
+    <p>If you follow the previous example, Each bsp processor will send a 
bundle of thousand Integer messages to a masterTask. Instead, you could use a 
Combiners in your BSP program to perform a sum Integer messages and to write 
more concise and maintainable as below, that is why you use Combiners. </p>
+    <pre>
+  public static class SumCombiner extends Combiner {
+
+    @Override
+    public BSPMessageBundle combine(Iterable&lt;BSPMessage&gt; messages) {
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      int sum = 0;
+
+      Iterator&lt;BSPMessage&gt; it = messages.iterator();
+      while (it.hasNext()) {
+        sum += ((IntegerMessage) it.next()).getData();
+      }
+
+      bundle.addMessage(new IntegerMessage("Sum", sum));
+      return bundle;
+    }
+
+  }</pre>
+    
     <subsection name="Shell Command Line Interfaces"></subsection>
     <p>Hama provides several command for BSP job administration:</p>
     

Modified: hama/trunk/src/site/xdoc/hama_graph_tutorial.xml
URL: 
http://svn.apache.org/viewvc/hama/trunk/src/site/xdoc/hama_graph_tutorial.xml?rev=1574446&r1=1574445&r2=1574446&view=diff
==============================================================================
--- hama/trunk/src/site/xdoc/hama_graph_tutorial.xml (original)
+++ hama/trunk/src/site/xdoc/hama_graph_tutorial.xml Wed Mar  5 11:35:01 2014
@@ -27,7 +27,7 @@ xsi:schemaLocation="http://maven.apache.
     <p>Hama includes the Graph package for vertex-centric graph computations.
     Hama's Graph package allows you to program Google's Pregel style 
applications with simple programming interface.</p>
 
-    <subsection name="Vertex API"></subsection>    
+    <subsection name="Vertex"></subsection>    
 
     <p>Writing a Hama graph application involves subclassing the predefined 
Vertex class. Its template arguments define three value types, associated with 
vertices, edges, and messages.</p>
     <pre>
@@ -41,7 +41,7 @@ xsi:schemaLocation="http://maven.apache.
 
    <p>The user overrides the Compute() method, which will be executed at each 
active vertex in every superstep. Predefined Vertex methods allow Compute() to 
query information about the current vertex and its edges, and to send messages 
to other vertices. Compute() can inspect the value associated with its vertex 
via GetValue().</p>
 
-   <subsection name="VertexReader API"></subsection>
+   <subsection name="VertexReader"></subsection>
    <p>You can create your own VertexReader for your data format by exending 
org.apache.hama.graph.<b>VertexInputReader</b> class.
 
    For example, an sequence file contains a linked list of Vertex, can be 
parse as following:
@@ -63,7 +63,40 @@ xsi:schemaLocation="http://maven.apache.
     }
   }
 </pre>
+   <subsection name="Combiners"></subsection>
+   <p>Sending a message to another vertex that exists on a different machine 
has some overhead. However if the algorithm doesn't require each message 
explicitly but a function of it (example sum) then combiners can be used.</p>
+   <h4>Write your own Combiner</h4>
+   <p>To write your own combiner, you have to extend Combiner class and 
implement the methods of #combine(Iterable&lt;M&gt; messages). 
+   For more, please see the implementation of MinIntCombiner in 
org.apache.hama.example.SSSP example.</p> 
 
+   <subsection name="Aggregators"></subsection>
+   <p>Aggregators are a mechanism for global communication, monitoring, and 
data. Each vertex can provide a value to an aggregator in superstep S, the 
system combines those values using a reduction operator, and the resulting 
value is made available to all vertices in superstep S + 1.
+   </p>
+   <h4>Registering aggregators</h4>
+   <p>To start using aggregators, you must declare them in your GraphJob:</p>
+   <pre>
+  HamaConfiguration conf = new HamaConfiguration(new Configuration());
+  GraphJob graphJob = new GraphJob(conf, MyClass.class);
+
+  // To add an average aggregator
+  graphJob.setAggregatorClass(AverageAggregator.class);
+
+  // To add a sum aggregator
+  graphJob.setAggregatorClass(SumAggregator.class);</pre>
+   <p>There are multiple different aggregators and you can also make your own. 
You can look for already implemented aggregators in org.apache.hama.graph 
package.</p>
+   <h4>Start working with aggregators</h4>
+   <p>In order to aggregate values from your vertices, use:</p> 
+<pre>
+  this.aggregate(index,value);</pre>
+  
+  <p>This method is called from inside each vertex. Though it's not mandatory 
all vertices to make use of this method. The index parameter of this method is 
a number that is equivalent to the order of the registered aggregator. (The 
first registered aggregator has index 0, second has index 1 etc.) </p>
+  <h4>Get results</h4>
+  <p>Inside your vertex, you can get the results of each aggregator by using 
the method:</p>
+  <pre>
+  this.getAggregatedValue(index);</pre>
+  
+  <h4>Write your own aggregators</h4>
+  <p>To write your own aggregator, you have to extend AbstractAggregator class 
and implement the methods of #aggregate(M value) and #getValue(). For more, 
please see the default implementation of aggregators in org.apache.hama.graph 
package.</p>
    <subsection name="Example: PageRankVertex"></subsection>
    <p>To solve the Page Rank problem using Hama Graph, you can extends the 
Vertex class to create a PageRankVertex class.
 In this example, the algorithm described Google's Pregel paper was used. The 
value of a vertex represents the tentative page rank of the vertex. The graph 
is intialized with each vertex value equal to 1/numOfVertices. In each of the 
first 30 supersteps, each vertex sends its tentative page rank along all of its 
outgoing edges.
@@ -77,25 +110,29 @@ From Superstep 1 to 30, each vertex sums
 
     @Override
     public void compute(Iterator&lt;DoubleWritable&gt; messages) throws 
IOException {
+      // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
-        this.setValue(new DoubleWritable(1.0 / (double) 
this.getNumVertices()));
-      }
-
-      if (this.getSuperstepCount() >= 1) {
+        setValue(new DoubleWritable(1.0 / this.getNumVertices()));
+      } else if (this.getSuperstepCount() >= 1) {
         double sum = 0;
-        while (messages.hasNext()) {
-          DoubleWritable msg = messages.next();
+        for (DoubleWritable msg : messages) {
           sum += msg.get();
         }
-
-        double ALPHA = (1 - 0.85) / (double) this.getNumVertices();
-        this.setValue(new DoubleWritable(ALPHA + (0.85 * sum)));
+        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+        setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
+        aggregate(0, this.getValue());
       }
 
-      if (this.getSuperstepCount() &lt; this.getMaxIteration()) {
-        int numEdges = this.getOutEdges().size();
+      // if we have not reached our global error yet, then proceed.
+      DoubleWritable globalError = getAggregatedValue(0);
+      
+      if (globalError != null &amp;&amp; this.getSuperstepCount() &gt; 2
+          &amp;&amp; MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+        voteToHalt();
+      } else {
+        // in each superstep we are going to send a new rank to our neighbours
         sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
-            / numEdges));
+            / this.getEdges().size()));
       }
     }
   }</pre>


Reply via email to