Author: edwardyoon Date: Thu Jan 26 08:43:57 2012 New Revision: 1236074 URL: http://svn.apache.org/viewvc?rev=1236074&view=rev Log: Add tutorial page.
Modified: incubator/hama/trunk/src/site/resources/css/site.css incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Modified: incubator/hama/trunk/src/site/resources/css/site.css URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/site/resources/css/site.css?rev=1236074&r1=1236073&r2=1236074&view=diff ============================================================================== --- incubator/hama/trunk/src/site/resources/css/site.css (original) +++ incubator/hama/trunk/src/site/resources/css/site.css Thu Jan 26 08:43:57 2012 @@ -30,10 +30,20 @@ body a:hover { color: #888800; } -h2, h3, h4 { +h2, h3, h4, h5 { color: #74240f; } +h5 { + font-size: 15px; + margin-top: -0.8em; +} + +table { + width: 95%; + margin-left:2em; +} + ul { margin-left:10px; } Modified: incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml?rev=1236074&r1=1236073&r2=1236074&view=diff ============================================================================== --- incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml (original) +++ incubator/hama/trunk/src/site/xdoc/hama_bsp_tutorial.xml Thu Jan 26 08:43:57 2012 @@ -23,8 +23,279 @@ xsi:schemaLocation="http://maven.apache. <section name="Hama BSP Tutorial"></section> <p>This document describes the Hama BSP framework and serves as a tutorial.</p> <subsection name="Overview"></subsection> - <subsection name="Inputs and Outputs"></subsection> - <subsection name="BSP user interfaces"></subsection> - <subsection name="Example: Pi calculation"></subsection> + <p>Hama provides a Pure BSP Bulk Synchronous Parallel Model for message passing and collective communication. + A BSP program consists of a sequence of supersteps. Each superstep consists of the following three phases:</p> + + <ul> + <li>Local computation</li> + <li>Process communication</li> + <li>Barrier synchronization</li> + </ul> + <p>BSP programming enables you to write high-performance parallel computing algorithms for a wide range of scientific problems.</p> + + <subsection name="Create your own BSP by extending BSP class"></subsection> + + <p>The way to create your own BSP class is to create a class that extends the org.apache.hama.bsp.<b>BSP</b> class. + <br/> + The extending class must override the bsp() method, which is declared like this: + </p> + <pre class="green"> + public abstract void bsp(BSPPeer<K1, V1, K2, V2> peer) throws IOException, + SyncException, InterruptedException;</pre> + + <p> + You will define the BSP program inside this bsp() method. + It is important to understand that it doesn't mean a single superstep. + As described above, a BSP program consists of a sequence of supersteps. + So it just gets called once, not all over again unlike Mapper or Reducer method. + <br/><br/> + <b>NOTE</b>: Optionally, there are also setup() and cleanup() which will be called at the beginning of your computation, + respectively at the end of the computation. cleanup() is guranteed to run after the computation or in case of failure. + You can simply override the methods you need from BSP class. + <br/><br/> + After your own BSP is created, you will need to configure a <b>BSPJob</b> and submit it to Hama cluster to execute a job. + The BSP job configuration and submission interfaces is almost the same as the MapReduce job configuration:</p> + <pre class="green"> + HamaConfiguration conf = new HamaConfiguration(); + BSPJob job = new BSPJob(conf, MyBSP.class); + job.setJobName("My BSP program"); + job.setBspClass(MyBSP.class); + job.setInputFormat(NullInputFormat.class); + job.setOutputKeyClass(Text.class); + ... + job.waitForCompletion(true);</pre> + <p>See the below section for more detailed description of BSP user interfaces.</p> + + <subsection name="User Interfaces"></subsection> + <h5>Inputs and Outputs</h5> + <p>When setting up a BSPJob, you can provide a Input/OutputFormat and Paths like this:</p> + <pre class="green"> + job.setInputPath(new Path("/tmp/sequence.dat"); + job.setInputFormat(org.apache.hama.bsp.SequenceFileInputFormat.class); + or, + SequenceFileInputFormat.addInputPath(job, new Path("/tmp/sequence.dat")); + or, + SequenceFileInputFormat.addInputPaths(job, "/tmp/seq1.dat,/tmp/seq2.dat,/tmp/seq3.dat"); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setOutputFormat(TextOutputFormat.class); + FileOutputFormat.setOutputPath(job, new Path("/tmp/result")); + </pre> + + <p>Then, you can read the input and write the output from the methods in BSP class which has "BSPPeer" which contains an communication, counters, and IO interfaces as parameter. + In this case we read a normal text file:</p> + + <pre class="green"> + @Override + public final void bsp( + BSPPeer<LongWritable, Text, Text, LongWritable> peer) + throws IOException, InterruptedException, SyncException { + + // this method reads the next key value record from file + KeyValuePair<LongWritable, Text> pair = peer.readNext(); + + // the following lines do the same: + LongWritable key = new LongWritable(); + Text value = new Text(); + peer.readNext(key, value); + + // write + peer.write(value, key); + }</pre> + <p>Consult the docs for more detail on events like end of file. + There is also a function which allows you to re-read the input from the beginning. + This snippet reads the input five times: + </p> + <pre class="green"> + for(int i = 0; i < 5; i++){ + LongWritable key = new LongWritable(); + Text value = new Text(); + while (peer.readNext(key, value)) { + // read everything + } + // reopens the input + peer.reopenInput() + }</pre> + + <h5>Communication</h5> + <p>Hama BSP provides simple but powerful communication APIs for many purposes. + We tried to follow the standard library of BSP world as much as possible. + The following table describes all the methods you can use:</p> + +<table align="center" border="0"> +<tr><td><b>Method</b></td><td><b>Description</b></td></tr> +<tr><td>send(String peerName, BSPMessage msg)</td><td>Sends a message to another peer.</td></tr> +<tr><td>getCurrentMessage()</td><td>Returns a received message.</td></tr> +<tr><td>getNumCurrentMessages()</td><td>Returns the number of received messages.</td></tr> +<tr><td>sync()</td><td>Barrier synchronization.</td></tr> +<tr><td>getPeerName()</td><td>Returns a peerâs hostname.</td></tr> +<tr><td>getAllPeerNames()</td><td>Returns all peerâs hostname.</td></tr> +<tr><td>getSuperstepCount()</td><td>Returns the count of supersteps</td></tr> +</table> + + <p>The send() and all the other functions are very flexible. Here is an example that sends a message to all peers:</p> + + <pre class="green"> + @Override + public void bsp( + BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) + throws IOException, SyncException, InterruptedException { + for (String peerName : peer.getAllPeerNames()) { + peer.send(peerName, + new LongMessage("Hello from " + peer.getPeerName(), System.currentTimeMillis())); + } + + peer.sync(); + }</pre> + + <h5>Synchronization</h5> + + <p>When all the processes have entered the barrier via the sync() method, + the Hama proceeds to the next superstep. + In the previous example, the BSP job will be finished by one synchronization + after sending a message "Hello from ..." to all peers. + <br/><br/> + But, keep in mind that the sync() function is not the end of the BSP job. + As was previously mentioned, all the communication functions are very flexible. + For example, the sync() method also can be called in a for loop + so that you can use to program the iterative methods sequentially:</p> + + <pre class="green"> + @Override + public void bsp( + BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) + throws IOException, SyncException, InterruptedException { + for (int i = 0; i < 100; i++) { + // send some messages + peer.sync(); + } + }</pre> + + + <subsection name="Shell Command Line Interfaces"></subsection> + <p>Hama provides several command for BSP job administration:</p> + +<table align="center" border="0"> +<tr><td><b>Command</b></td><td><b>Description</b></td></tr> +<tr><td>-submit <job-file></td><td>Submits the job.</td></tr> +<tr><td>-status <job-id></td><td>Prints the job status.</td></tr> +<tr><td>-kill <job-id></td><td>Kills the job.</td></tr> +<tr><td>-list [all]</td><td>-list all displays all jobs. -list displays only jobs which are yet to be completed.</td></tr> +<tr><td>-list-active-grooms</td><td>Displays the list of active groom server in the cluster.</td></tr> +<tr><td>-list-attempt-ids <jobId> <task-state></td><td>Displays the list of tasks for a given job currently in a particular state (running or completed).</td></tr> +<tr><td>-kill-task <task-id></td><td>Kills the task. Killed tasks are NOT counted against failed attempts.</td></tr> +<tr><td>-fail-task <task-id></td><td>Fails the task. Failed tasks are counted against failed attempts.</td></tr> +</table> + + <subsection name="Example: Pi Calculation"></subsection> + <p>Here is an BSP-based Pi Calculation example and submit it to Hama cluster:</p> + <pre class="green"> +public class PiEstimator { + private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis()); + + public static class MyEstimator extends + BSP<NullWritable, NullWritable, Text, DoubleWritable> { + public static final Log LOG = LogFactory.getLog(MyEstimator.class); + private String masterTask; + private static final int iterations = 10000; + + @Override + public void bsp( + BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) + throws IOException, SyncException, InterruptedException { + + int in = 0, out = 0; + for (int i = 0; i < iterations; i++) { + double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0; + if ((Math.sqrt(x * x + y * y) < 1.0)) { + in++; + } else { + out++; + } + } + + double data = 4.0 * (double) in / (double) iterations; + DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data); + + peer.send(masterTask, estimate); + peer.sync(); + } + + @Override + public void setup( + BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) + throws IOException { + // Choose one as a master + this.masterTask = peer.getPeerName(peer.getNumPeers() / 2); + } + + public void cleanup( + BSPPeer<NullWritable, NullWritable, Text, DoubleWritable> peer) + throws IOException { + if (peer.getPeerName().equals(masterTask)) { + double pi = 0.0; + int numPeers = peer.getNumCurrentMessages(); + DoubleMessage received; + while ((received = (DoubleMessage) peer.getCurrentMessage()) != null) { + pi += received.getData(); + } + + pi = pi / numPeers; + peer + .write(new Text("Estimated value of PI is"), new DoubleWritable(pi)); + } + } + } + + static void printOutput(HamaConfiguration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + FileStatus[] files = fs.listStatus(TMP_OUTPUT); + for (int i = 0; i < files.length; i++) { + if (files[i].getLen() > 0) { + FSDataInputStream in = fs.open(files[i].getPath()); + IOUtils.copyBytes(in, System.out, conf, false); + in.close(); + break; + } + } + + fs.delete(TMP_OUTPUT, true); + } + + public static void main(String[] args) throws InterruptedException, + IOException, ClassNotFoundException { + // BSP job configuration + HamaConfiguration conf = new HamaConfiguration(); + + BSPJob bsp = new BSPJob(conf, PiEstimator.class); + // Set the job name + bsp.setJobName("Pi Estimation Example"); + bsp.setBspClass(MyEstimator.class); + bsp.setInputFormat(NullInputFormat.class); + bsp.setOutputKeyClass(Text.class); + bsp.setOutputValueClass(DoubleWritable.class); + bsp.setOutputFormat(TextOutputFormat.class); + FileOutputFormat.setOutputPath(bsp, TMP_OUTPUT); + + BSPJobClient jobClient = new BSPJobClient(conf); + ClusterStatus cluster = jobClient.getClusterStatus(true); + + if (args.length > 0) { + bsp.setNumBspTask(Integer.parseInt(args[0])); + } else { + // Set to maximum + bsp.setNumBspTask(cluster.getMaxTasks()); + } + + long startTime = System.currentTimeMillis(); + if (bsp.waitForCompletion(true)) { + printOutput(conf); + System.out.println("Job Finished in " + + (double) (System.currentTimeMillis() - startTime) / 1000.0 + + " seconds"); + } + } +}</pre> </body> </document>