[ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12442121 ] Sanjay Dahiya commented on HADOOP-234: --------------------------------------
[[ Old comment, sent by email on Mon, 22 May 2006 17:28:16 +0530 ]] Hi Doug - I was also looking for a way to avoid serialization and de- serialization, but I am still not clear how do we use existing record IO with this (without modifying generated classes) When we define a new record format and generate classes to work with the record, the generated classes contain Reader and Writer for the record. These read/write 'Record' objects from streams. One way to implement this would be to modify the class generation and make Reader extend SequenceInputFile and return(optionally) a ByetWritable rather than a Record. With this in place we should be able to avoid the need for serialization/de-serialization and users will not need to write extra code per type of record. Or am I missing something here ? ~Sanjay > Support for writing Map/Reduce functions in C++ > ----------------------------------------------- > > Key: HADOOP-234 > URL: http://issues.apache.org/jira/browse/HADOOP-234 > Project: Hadoop > Issue Type: New Feature > Components: mapred > Reporter: Sanjay Dahiya > Attachments: Hadoop MaReduce Developer doc.pdf, Hadoop MaReduce > Developer doc.pdf > > > MapReduce C++ support > Requirements > 1. Allow users to write Map and Reduce functions in C++, rest of the > infrastructure already present in Java should be reused. > 2. Avoid users having to write both Java and C++ for this to work. > 3. Avoid users having to work with JNI methods directly by wrapping them in > helper functions. > 4. Use Record IO for describing record format, both MR java framework and C++ > should > use the same format to work seemlessly. > 5. Allow users to write simple map reduce tasks without learning record IO if > keys and values are > simple strings. > Implementation notes > - If keys and values are simple strings then user passes SimpleNativeMapper > in JobConf and implements > mapper and reducer methods in C++. > - For composite Record IO types user starts with defining a record format > using Record IO DDL. > - User generates Java and C++ classes from the DDL using record IO. > - Users configures JobConf to use the generated Java classes as the MR > input/output, key/value classes. > - User writes Map and Reduce functions in C++ using a standard interface ( > given below ) , this interface > makes a serialized record IO format available to the C++ function which > should be deserialized in corrosponding > generated C++ record IO classes. > - User uses the helper functions to pass the serialized format of generated > output key/value pairs to output collector. > Following is a pseudocode for the Mapper ( Reducer can be implemented > similarly ) - > Native(JNI) Java proxy for the Mapper : > --------------------------------------- > Without Record IO :- > -------------------- > public class SimpleNativeMapper extends MapReduceBase implements Mapper { > /** > * Works on simple strings. > **/ > public void map(WritableComparable key, Writable value, > OutputCollector output, Reporter reporter) throws > IOException { > mapNative(key.toString().getBytes() > , value.toString().getBytes(), output, > reporter); > } > > /** > * Native implementation. > **/ > private native void mapNative(byte[] key, byte[] value, > OutputCollector output, Reporter reporter) throws > IOException; > } > With Record IO :- > ------------------ > public class RecordIONativeMapper extends MapReduceBase implements Mapper { > /** > * Implementation of map method, this acts as a JNI proxy for actual > map > * method implemented in C++. Works for Record IO based records. > * @see map(byte[] , byte[], OutputCollector, Reporter) > */ > public void map(WritableComparable key, Writable value, > OutputCollector output, Reporter reporter) throws > IOException { > > byte[] keyBytes = null ; > byte[] valueBytes = null ; > > try{ > // we need to serialize the key and record and pass the > serialized > // format to C++ / JNI methods so they can interpret it > using appropriate > // record IO classes. > { > ByteArrayOutputStream keyStream = new > ByteArrayOutputStream() ; > BinaryOutputArchive boa = new > BinaryOutputArchive(new DataOutputStream(keyStream)) ; > > ((Record)key).serialize(boa, "WhatIsTag"); > keyBytes = keyStream.toByteArray(); > } > { > ByteArrayOutputStream valueStream = new > ByteArrayOutputStream() ; > BinaryOutputArchive boa = new > BinaryOutputArchive(new DataOutputStream(valueStream)) ; > > ((Record)key).serialize(boa, "WhatIsTag"); > valueBytes = valueStream.toByteArray(); > } > }catch(ClassCastException e){ > // throw better exceptions > throw new IOException("Input record must be of Record > IO Type"); > } > // pass the serialized byte[] to C++ implementation. > mapNative(keyBytes, valueBytes, output, reporter); > } > /** > * Implementation in C++. > */ > private native void mapNative(byte[] key, byte[] value, > OutputCollector output, Reporter reporter) throws > IOException; > } > OutputCollector Proxy for C++ > ------------------------------ > public class NativeOutputCollector implements OutputCollector { > // standard method from interface > public void collect(WritableComparable key, Writable value) > throws IOException { > } > > // deserializes key and value and calls collect(WritableComparable, > Writable) > public void collectFromNative(byte[]key, byte[]value){ > // deserialize key and value to java types ( as configured in > JobConf ) > // call actual collect method > } > } > Core Native functions ( helper for user provided Mapper and Reducer ) > --------------------------------------------------------------------- > #include "org_apache_hadoop_mapred_NativeMapper.h" > #include "UserMapper.h" > /** > * A C++ proxy method, calls actual implementation of the Mapper. This > method > signature is generated by javah. > **/ > JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative > (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, > jobject output_collector, jobject reporter); > { > > // convert char* and pass on to user defined map method. > // user's map method should take care of converting it to correct > record IO > // type. > int keyLen = (*env)->GetArrayLength(env, key) ; > int valueLen = (*env)->GetArrayLength(env, valueLen) ; > const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, > JNI_FALSE) ; > const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, > JNI_FALSE) ; > > // Call User defined method > user_map(keyBuf, valueBuf, output_collector, reporter) ; > > (*env)->ReleaseByteArrayElements(env, key, keyBuf, JNI_ABORT) ; > (*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; > } > /** > Helper method, acts as a proxy to OutputCollector in java. key and > value > must be serialized forms of records as specified in JobConf. > **/ > void output_collector(const char * key, const char *value, > jobject output_collector, jobject reporter){ > > // invoke java NativeOutputCollector.collect with key and value. > } > User defined Mapper ( and Reducer ) > ------------------------------------ > /** > implements user defined map operation. > **/ > void user_mapper(const char *key, const char *value, jobject collector, > jobject recorder) { > //1. deserialize key/value in the appropriate format using record IO. > > //2. process key/value and generate the intermediate key/values in > record IO format. > > //3. Deserialize intermediate key/values to intermed_key and > intermed_value > > //4. pass intermed_key/intermed_value using helper function - > // output_collector(intermed_key, intermed_value, > collector, recorder); > > > } -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira