[ 
http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12442121 ] 
            
Sanjay Dahiya commented on HADOOP-234:
--------------------------------------


   [[ Old comment, sent by email on Mon, 22 May 2006 17:28:16 +0530 ]]

Hi Doug -
I was also looking for a way to avoid serialization and de- 
serialization, but I am still not clear how do we use existing record  
IO with this (without modifying generated classes)
When we define a new record format and generate classes to work with  
the record, the generated classes contain Reader and Writer for the  
record. These read/write 'Record' objects from streams. One way to  
implement this would be to modify the class generation and make  
Reader extend SequenceInputFile and return(optionally) a ByetWritable  
rather than a Record. With this in place we should be able to avoid  
the need for serialization/de-serialization and users will not need  
to write extra code per type of record. Or am I missing something here ?

~Sanjay







> Support for writing Map/Reduce functions in C++
> -----------------------------------------------
>
>                 Key: HADOOP-234
>                 URL: http://issues.apache.org/jira/browse/HADOOP-234
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>            Reporter: Sanjay Dahiya
>         Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce 
> Developer doc.pdf
>
>
> MapReduce C++ support
> Requirements 
> 1. Allow users to write Map and Reduce functions in C++, rest of the 
> infrastructure already present in Java should be reused. 
> 2. Avoid users having to write both Java and C++ for this to work. 
> 3. Avoid users having to work with JNI methods directly by wrapping them in 
> helper functions. 
> 4. Use Record IO for describing record format, both MR java framework and C++ 
> should 
>       use the same format to work seemlessly. 
> 5. Allow users to write simple map reduce tasks without learning record IO if 
> keys and values are 
>       simple strings. 
> Implementation notes
> - If keys and values are simple strings then user passes SimpleNativeMapper 
> in JobConf and implements 
>       mapper and reducer methods in C++. 
> - For composite Record IO types user starts with defining a record format 
> using Record IO DDL. 
> - User generates Java and C++ classes from the DDL using record IO. 
> - Users configures JobConf to use the generated Java classes as the MR 
> input/output, key/value classes. 
> - User writes Map and Reduce functions in C++ using a standard interface ( 
> given below ) , this interface 
>   makes a serialized record IO format available to the C++ function which 
> should be deserialized in corrosponding
>   generated C++ record IO classes. 
> - User uses the helper functions to pass the serialized format of generated 
> output key/value pairs to output collector. 
> Following is a pseudocode for the Mapper ( Reducer can be implemented 
> similarly ) - 
> Native(JNI) Java proxy for the Mapper : 
> ---------------------------------------
> Without Record IO :-
> --------------------
> public class SimpleNativeMapper extends MapReduceBase implements Mapper {
>       /**
>         * Works on simple strings. 
>         **/
>       public void map(WritableComparable key, Writable value,
>                       OutputCollector output, Reporter reporter) throws 
> IOException {
>               mapNative(key.toString().getBytes()
>                               , value.toString().getBytes(), output, 
> reporter); 
>       }
>       
>       /**
>         * Native implementation. 
>         **/
>       private native void mapNative(byte[] key, byte[] value, 
>                       OutputCollector output, Reporter reporter) throws 
> IOException;
> }
> With Record IO :- 
> ------------------
> public class RecordIONativeMapper extends MapReduceBase implements Mapper {
>       /**
>        * Implementation of map method, this acts as a JNI proxy for actual 
> map 
>        * method implemented in C++. Works for Record IO based records. 
>        * @see map(byte[] , byte[],    OutputCollector, Reporter)
>        */
>       public void map(WritableComparable key, Writable value,
>                       OutputCollector output, Reporter reporter) throws 
> IOException {
>               
>               byte[] keyBytes = null ;
>               byte[] valueBytes = null ;
>               
>               try{
>                       // we need to serialize the key and record and pass the 
> serialized 
>                       // format to C++ / JNI methods so they can interpret it 
> using appropriate 
>                       // record IO classes. 
>                       {
>                               ByteArrayOutputStream keyStream = new 
> ByteArrayOutputStream() ; 
>                               BinaryOutputArchive boa = new 
> BinaryOutputArchive(new DataOutputStream(keyStream)) ;
>                               
>                               ((Record)key).serialize(boa, "WhatIsTag");
>                               keyBytes = keyStream.toByteArray();
>                       }
>                       {
>                               ByteArrayOutputStream valueStream = new 
> ByteArrayOutputStream() ; 
>                               BinaryOutputArchive boa = new 
> BinaryOutputArchive(new DataOutputStream(valueStream)) ;
>                               
>                               ((Record)key).serialize(boa, "WhatIsTag");
>                               valueBytes = valueStream.toByteArray();
>                       }
>               }catch(ClassCastException e){
>                       // throw better exceptions
>                       throw new IOException("Input record must be of Record 
> IO Type"); 
>               }
>               // pass the serialized byte[] to C++ implementation. 
>               mapNative(keyBytes, valueBytes, output, reporter); 
>       }
>       /**
>        * Implementation in C++.
>        */
>       private native void mapNative(byte[] key, byte[] value, 
>                       OutputCollector output, Reporter reporter) throws 
> IOException;
> }
> OutputCollector Proxy for C++
> ------------------------------
> public class NativeOutputCollector implements OutputCollector {
>       // standard method from interface
>       public void collect(WritableComparable key, Writable value)
>                       throws IOException {
>       }
>       
>       // deserializes key and value and calls collect(WritableComparable, 
> Writable)
>       public void collectFromNative(byte[]key, byte[]value){
>               // deserialize key and value to java types ( as configured in 
> JobConf )
>               // call actual collect method 
>       }
> }
> Core Native functions ( helper for user provided Mapper and Reducer ) 
> ---------------------------------------------------------------------
> #include "org_apache_hadoop_mapred_NativeMapper.h"
> #include "UserMapper.h"
> /**
>       * A C++ proxy method, calls actual implementation of the Mapper. This 
> method 
>       signature is generated by javah.
> **/
> JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative
>   (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, 
>       jobject output_collector, jobject reporter);
> {
>  
>       // convert char* and pass on to user defined map method. 
>       // user's map method should take care of converting it to correct 
> record IO 
>       // type.
>       int keyLen = (*env)->GetArrayLength(env, key) ;
>       int valueLen = (*env)->GetArrayLength(env, valueLen) ;
>       const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, 
> JNI_FALSE) ;
>       const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, 
> JNI_FALSE) ;
>       
>       // Call User defined method 
>       user_map(keyBuf, valueBuf, output_collector, reporter) ;
>       
>       (*env)->ReleaseByteArrayElements(env, key, keyBuf,  JNI_ABORT) ; 
>       (*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; 
> }
> /**
>       Helper method, acts as a proxy to OutputCollector in java. key and 
> value 
>       must be serialized forms of records as specified in JobConf. 
> **/
> void output_collector(const char * key, const char *value,
>       jobject output_collector, jobject reporter){
>       
>       // invoke java NativeOutputCollector.collect with key and value. 
> }
> User defined Mapper ( and Reducer ) 
> ------------------------------------
> /**
>       implements user defined map operation. 
> **/
> void user_mapper(const char *key, const char *value, jobject collector, 
> jobject recorder) {
>       //1. deserialize key/value in the appropriate format using record IO. 
>       
>       //2. process key/value and generate the intermediate key/values in 
> record IO format. 
>       
>       //3. Deserialize intermediate key/values to intermed_key and 
> intermed_value
>       
>       //4. pass intermed_key/intermed_value using helper function - 
>       //              output_collector(intermed_key, intermed_value, 
> collector, recorder);
>       
>       
> }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to