[ http://issues.apache.org/jira/browse/HADOOP-234?page=comments#action_12412836 ]
Owen O'Malley commented on HADOOP-234: -------------------------------------- I agree with Doug that if you are going to throw away the advantages of actually having meaningful types on the C++ that the keys and values on the Java side should be BytesWritable. That said, I think it would be much less error-prone for the users and easier to understand and debug if you followed the Hadoop API much closer. Define a Writable and WritableComparable interfaces in C++. The Record IO classes will support them with a minor change to the code generator. > Support for writing Map/Reduce functions in C++ > ----------------------------------------------- > > Key: HADOOP-234 > URL: http://issues.apache.org/jira/browse/HADOOP-234 > Project: Hadoop > Type: New Feature > Components: mapred > Reporter: Sanjay Dahiya > > MapReduce C++ support > Requirements > 1. Allow users to write Map and Reduce functions in C++, rest of the > infrastructure already present in Java should be reused. > 2. Avoid users having to write both Java and C++ for this to work. > 3. Avoid users having to work with JNI methods directly by wrapping them in > helper functions. > 4. Use Record IO for describing record format, both MR java framework and C++ > should > use the same format to work seemlessly. > 5. Allow users to write simple map reduce tasks without learning record IO if > keys and values are > simple strings. > Implementation notes > - If keys and values are simple strings then user passes SimpleNativeMapper > in JobConf and implements > mapper and reducer methods in C++. > - For composite Record IO types user starts with defining a record format > using Record IO DDL. > - User generates Java and C++ classes from the DDL using record IO. > - Users configures JobConf to use the generated Java classes as the MR > input/output, key/value classes. > - User writes Map and Reduce functions in C++ using a standard interface ( > given below ) , this interface > makes a serialized record IO format available to the C++ function which > should be deserialized in corrosponding > generated C++ record IO classes. > - User uses the helper functions to pass the serialized format of generated > output key/value pairs to output collector. > Following is a pseudocode for the Mapper ( Reducer can be implemented > similarly ) - > Native(JNI) Java proxy for the Mapper : > --------------------------------------- > Without Record IO :- > -------------------- > public class SimpleNativeMapper extends MapReduceBase implements Mapper { > /** > * Works on simple strings. > **/ > public void map(WritableComparable key, Writable value, > OutputCollector output, Reporter reporter) throws > IOException { > mapNative(key.toString().getBytes() > , value.toString().getBytes(), output, > reporter); > } > > /** > * Native implementation. > **/ > private native void mapNative(byte[] key, byte[] value, > OutputCollector output, Reporter reporter) throws > IOException; > } > With Record IO :- > ------------------ > public class RecordIONativeMapper extends MapReduceBase implements Mapper { > /** > * Implementation of map method, this acts as a JNI proxy for actual > map > * method implemented in C++. Works for Record IO based records. > * @see map(byte[] , byte[], OutputCollector, Reporter) > */ > public void map(WritableComparable key, Writable value, > OutputCollector output, Reporter reporter) throws > IOException { > > byte[] keyBytes = null ; > byte[] valueBytes = null ; > > try{ > // we need to serialize the key and record and pass the > serialized > // format to C++ / JNI methods so they can interpret it > using appropriate > // record IO classes. > { > ByteArrayOutputStream keyStream = new > ByteArrayOutputStream() ; > BinaryOutputArchive boa = new > BinaryOutputArchive(new DataOutputStream(keyStream)) ; > > ((Record)key).serialize(boa, "WhatIsTag"); > keyBytes = keyStream.toByteArray(); > } > { > ByteArrayOutputStream valueStream = new > ByteArrayOutputStream() ; > BinaryOutputArchive boa = new > BinaryOutputArchive(new DataOutputStream(valueStream)) ; > > ((Record)key).serialize(boa, "WhatIsTag"); > valueBytes = valueStream.toByteArray(); > } > }catch(ClassCastException e){ > // throw better exceptions > throw new IOException("Input record must be of Record > IO Type"); > } > // pass the serialized byte[] to C++ implementation. > mapNative(keyBytes, valueBytes, output, reporter); > } > /** > * Implementation in C++. > */ > private native void mapNative(byte[] key, byte[] value, > OutputCollector output, Reporter reporter) throws > IOException; > } > OutputCollector Proxy for C++ > ------------------------------ > public class NativeOutputCollector implements OutputCollector { > // standard method from interface > public void collect(WritableComparable key, Writable value) > throws IOException { > } > > // deserializes key and value and calls collect(WritableComparable, > Writable) > public void collectFromNative(byte[]key, byte[]value){ > // deserialize key and value to java types ( as configured in > JobConf ) > // call actual collect method > } > } > Core Native functions ( helper for user provided Mapper and Reducer ) > --------------------------------------------------------------------- > #include "org_apache_hadoop_mapred_NativeMapper.h" > #include "UserMapper.h" > /** > * A C++ proxy method, calls actual implementation of the Mapper. This > method > signature is generated by javah. > **/ > JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_NativeMapper_mapNative > (JNIEnv *env, jobject thisObj, jbyteArray key, jbyteArray value, > jobject output_collector, jobject reporter); > { > > // convert char* and pass on to user defined map method. > // user's map method should take care of converting it to correct > record IO > // type. > int keyLen = (*env)->GetArrayLength(env, key) ; > int valueLen = (*env)->GetArrayLength(env, valueLen) ; > const char *keyBuf = (*env)->GetByteArrayElements(env,key, keyLen, > JNI_FALSE) ; > const char *valueuf = (*env)->GetByteArrayElements(env,value, valueLen, > JNI_FALSE) ; > > // Call User defined method > user_map(keyBuf, valueBuf, output_collector, reporter) ; > > (*env)->ReleaseByteArrayElements(env, key, keyBuf, JNI_ABORT) ; > (*env)->ReleaseByteArrayElements(env, value, ValueBuf, JNI_ABORT) ; > } > /** > Helper method, acts as a proxy to OutputCollector in java. key and > value > must be serialized forms of records as specified in JobConf. > **/ > void output_collector(const char * key, const char *value, > jobject output_collector, jobject reporter){ > > // invoke java NativeOutputCollector.collect with key and value. > } > User defined Mapper ( and Reducer ) > ------------------------------------ > /** > implements user defined map operation. > **/ > void user_mapper(const char *key, const char *value, jobject collector, > jobject recorder) { > //1. deserialize key/value in the appropriate format using record IO. > > //2. process key/value and generate the intermediate key/values in > record IO format. > > //3. Deserialize intermediate key/values to intermed_key and > intermed_value > > //4. pass intermed_key/intermed_value using helper function - > // output_collector(intermed_key, intermed_value, > collector, recorder); > > > } -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: http://issues.apache.org/jira/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira
