Dear Wiki user, You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.
The "HamaPipes" page has been changed by MartinIllecker: https://wiki.apache.org/hama/HamaPipes?action=diff&rev1=6&rev2=7 - '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C/C++. + '''Hama Pipes''' is equivalent to '''Hadoop Pipes''' and offers the possibility to use Hama with C++. == Installation == - You can compile Hama Pipes by executing the following commands: + The native compilation is integrated into the Maven build process. {{{ - cd $HAMA_HOME/c++/utils - ./configure - make install + mvn install - - cd $HAMA_HOME/c++/pipes - ./configure - make install }}} == Interface == - Hama Pipes provides the following methods for C/C++ integration: (similar to the [[BSPModel|BSPModel]]) + Hama Pipes provides the following methods for C++ integration: (similar to the functions of [[BSPModel#Communication_Model|BSP Communication Model]]) ||Function||Description|| - ||`void sendMessage(const string& peerName, const string& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.|| + ||`void sendMessage(const string& peerName, const M& msg)`||Send a message to another peer. Messages sent by this method are not guaranteed to be received in a sent order.|| - ||`string& getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).|| + ||`M getCurrentMessage()`||Returns a message from the peer's received messages queue (a FIFO).|| ||`int getNumCurrentMessages()`||Returns the number of messages in the peer's received messages queue.|| ||`void sync()`||Starts the barrier synchronization and sends all the messages in the outgoing message queues to the corresponding remote peers.|| ||`long getSuperstepCount()`||Returns the count of current super-step.|| @@ -32, +26 @@ ||`vector<string> getAllPeerNames()`||Returns the names of all the peers executing tasks from the same job (including this peer).|| ||`int getNumPeers()`||Returns the number of peers.|| ||`void clear()`||Clears all queues entries.|| - ||`void write(const string& key, const string& value)`||Writes a key/value pair to the output collector.|| + ||`void write(const K2& key, const V2& value)`||Writes a key/value pair to the output collector.|| - ||`bool readNext(string& key, string& value)`||Deserializes the next input key value into the given objects.|| + ||`bool readNext(K1& key, V1& value)`||Deserializes the next input key value into the given objects.|| ||`void reopenInput()`||Closes the input and opens it right away, so that the file pointer is at the beginning again.|| - The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] under C/C++: + The following additional methods support access to [[http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/io/SequenceFile.html|SequenceFiles]] in C++: ||Function||Description|| ||`sequenceFileOpen(const string& path, const string& option, const string& keyType, const string& valueType)`||Opens a SequenceFile with option "r" or "w", key/value type and returns the corresponding fileID.|| - ||`bool sequenceFileReadNext(int fileID, string& key, string& value)`||Reads the next key/value pair from the SequenceFile.|| + ||`bool sequenceFileReadNext(int fileID, K& key, V& value)`||Reads the next key/value pair from the SequenceFile.|| - ||`bool sequenceFileAppend(int fileID, const string& key, const string& value)`||Appends the next key/value pair to the SequenceFile.|| + ||`bool sequenceFileAppend(int fileID, const K& key, const V& value)`||Appends the next key/value pair to the SequenceFile.|| ||`bool sequenceFileClose(int fileID)`||Closes a SequenceFile.|| - == C++ BSP example == + == Compilation == - Finally here is the [[PiEstimator|Pi Estimator]] example implemented with Hama Pipes: + The following command can be used to compile a Hama Pipes application: + + {{{ + g++ -Ic++/src/main/native/utils/api \ + -Ic++/src/main/native/pipes/api \ + -Lc++/target/native \ + -lhadooputils -lpthread \ + PROGRAM.cc \ + -o PROGRAM \ + -g -Wall -O2 + }}} + Please notice that paths have to be adjusted, if you are not operating in the Hama source folder. + + == Examples == + Hama Pipes includes the following examples: + * Summation + * PiEstimator + * MatrixMultiplication + + These examples are located in `c++/src/main/native/examples` and explained in `c++/src/main/native/examples/README.txt`. + + === PiEstimator Example === + + This is a Hama Pipes C++ implementation of [[PiEstimator|PiEstimator]]. {{{ #include "hama/Pipes.hh" @@ -55, +72 @@ #include <time.h> #include <math.h> + #include <stdlib.h> #include <string> #include <iostream> - #include <cstdlib> using std::string; using std::cout; @@ -66, +83 @@ using HamaPipes::BSPContext; using namespace HadoopUtils; - class PiCalculationBSP: public BSP { + class PiEstimatorBSP: public BSP<string,string,string,double,int> { private: - string masterTask; + string master_task_; - int iterations; + long iterations_; // iterations_per_bsp_task public: - PiCalculationBSP(BSPContext& context) { + PiEstimatorBSP(BSPContext<string,string,string,double,int>& context) { - iterations = 10000; + iterations_ = 1000000L; + } - } + - inline double closed_interval_rand(double x0, double x1) { return x0 + (x1 - x0) * rand() / ((double) RAND_MAX); } - - void bsp(BSPContext& context) { - + + void setup(BSPContext<string,string,string,double,int>& context) { + // Choose one as a master + master_task_ = context.getPeerName(context.getNumPeers() / 2); + } + + void bsp(BSPContext<string,string,string,double,int>& context) { - // initialize random seed + /* initialize random seed */ srand(time(NULL)); int in = 0; - for (int i = 0; i < iterations; i++) { + for (long i = 0; i < iterations_; i++) { - //rand() -> greater than or equal to 0.0 and less than 1.0. double x = 2.0 * closed_interval_rand(0, 1) - 1.0; - double y = 2.0 * closed_interval_rand(0, 1) - 1.0; + double y = 2.0 * closed_interval_rand(0, 1) - 1.0; if (sqrt(x * x + y * y) < 1.0) { in++; } - } + } + + context.sendMessage(master_task_, in); + context.sync(); + } + + void cleanup(BSPContext<string,string,string,double,int>& context) { + if (context.getPeerName().compare(master_task_)==0) { - double data = 4.0 * in / iterations; + long total_hits = 0; + int msg_count = context.getNumCurrentMessages(); + for (int i=0; i < msg_count; i++) { + total_hits += context.getCurrentMessage(); + } + double pi = 4.0 * total_hits / (msg_count * iterations_); - context.sendMessage(masterTask, toString(data)); - context.sync(); - } - - void setup(BSPContext& context) { - // Choose one as a master - masterTask = context.getPeerName(context.getNumPeers() / 2); - } - - void cleanup(BSPContext& context) { - if (context.getPeerName().compare(masterTask)==0) { - double pi = 0.0; - int msgCount = context.getNumCurrentMessages(); - string received; - for (int i=0; i<msgCount; i++) { - string received = context.getCurrentMessage(); - pi += toDouble(received); - } - - pi = pi / msgCount; //msgCount = numPeers - context.write("Estimated value of PI is", toString(pi)); + context.write("Estimated value of PI", pi); } } }; int main(int argc, char *argv[]) { - return HamaPipes::runTask(HamaPipes::TemplateFactory<PiCalculationBSP>()); + return HamaPipes::runTask<string,string,string,double,int>(HamaPipes::TemplateFactory<PiEstimatorBSP,string,string,string,double,int>()); } }}} - Makefile for this example: - {{{ - CC = g++ - CPPFLAGS = -m64 -I$(HAMA_HOME)/c++/install/include - - PiCalculation: PiCalculation.cc - $(CC) $(CPPFLAGS) $< -L$(HAMA_HOME)/c++/install/lib -lhamapipes -lhadooputils -lcrypto -lpthread -g -O2 -o $@ - - clean: - rm -f PiCalculation - }}} - - The corresponding job configuration `PiCalculation_job.xml` looks like that: + The corresponding job configuration `piestimator.xml` looks like that: {{{ - <?xml version="1.0"?> <configuration> <property> - // Set the binary path on DFS <name>hama.pipes.executable</name> - <value>bin/PiCalculation</value> + <value>hdfs:/examples/bin/piestimator</value> </property> <property> <name>hama.pipes.java.recordreader</name> @@ -161, +159 @@ </property> <property> <name>bsp.output.format.class</name> - <value>org.apache.hama.bsp.TextOutputFormat</value> + <value>org.apache.hama.bsp.SequenceFileOutputFormat</value> + </property> + <property> + <name>bsp.output.key.class</name> + <value>org.apache.hadoop.io.Text</value> + </property> + <property> + <name>bsp.output.value.class</name> + <value>org.apache.hadoop.io.DoubleWritable</value> + </property> + <property> + <name>bsp.message.class</name> + <value>org.apache.hadoop.io.IntWritable</value> </property> <property> <name>hama.pipes.logging</name> @@ -169, +179 @@ </property> <property> <name>bsp.peers.num</name> - <value>10</value> + <value>3</value> </property> </configuration> }}} - Finally the PiCalculation example can be submitted with these commands: + Finally you can run the PiCalculation example by executing the following commands: {{{ + # First copy piestimator binary to dfs + hadoop fs -put c++/target/native/examples/piestimator \ + /examples/bin/piestimator + + # Run piestimator example + hama pipes \ + -conf c++/src/main/native/examples/conf/piestimator.xml \ + -output /examples/output/piestimator + + # View output data + hama seqdumper -seqFile /examples/output/piestimator/part-00001 + + # You should see + # Input Path: /examples/output/piestimator/part-00001 + # Key class: class org.apache.hadoop.io.Text + # Value Class: class org.apache.hadoop.io.DoubleWritable + # Key: Estimated value of PI: Value: 3.139116 + # Count: 1 + - # delete output dir + # Delete output folder + hadoop fs -rmr /examples/output/piestimator - hadoop dfs -rmr output/PiCalculation - # copy piCalculation binary to HDFS - hadoop dfs -rmr bin/PiCalculation - hadoop dfs -put PiCalculation bin/PiCalculation - # submit job - hama pipes -conf PiCalculation_job.xml -output output/PiCalculation }}}
