On Feb 3, 2011, at 6:25 PM, Allen Wittenauer wrote:

>       I think you need to give a more concrete example of what you are doing. 
>  -cache is used for sending files with your job and should have no bearing on 
> what your input is to your job.  Something tells me that you've cooked 
> something up that is overly complex. :D

Fair enough.  Here's my setup.

Our data consists of a few million binary (image) files, only about 6MB each 
uncompressed, 2.5MB gzipped.  A typical job does not need to touch every image, 
rather only a tiny subset of the images is required.  In this sense our job is 
unlike a canonical Hadoop job which combs an entire database.  As such our 
current implementation (not streaming, but "normal" Hadoop) first uses a SQL 
query against a nonHadoop database to find the names of the relevant images for 
a given job, then sets up input splits for those individual files and assigns 
them as input to the Hadoop job.

Side note: since the images are so small and we use so many at a time, they are 
packed into sequence files.  Building the input splits is a very precise piece 
of code since it must splice out the necessary image files from their locations 
in the sequence files as eventual Hadoop input splits.

The mappers receive byte arrays as input from the record reader (in the form of 
a WritableComparable class that contains the byte array).

Our computational heavy lifting resides in a very large and complex set of 
native libraries (C++) which our Java mappers call using JNI, passing the byte 
arrays (the image files slurped into memory) through JNI to a small native .so 
file which then coordinates the processing with the underlying processing 
library.  The native routines return byte arrays through JNI that represent the 
converted images.  The Java mappers pass the byte arrays on to the reducer 
stage for final processing.  In our current design, the reducer's computational 
complexity is simple enough to be implemented and performed entirely in Java, 
sans JNI or native code.

This system works fine at an organizational level, but unfortunately we have 
been plagued by extremely insidious segfaults and memory-allocation exceptions 
deep in the native library, bugs which absolutely never occur in a nonHadoop or 
nonJava environment when the library is accessed solo from a native C++ program 
(or from python scripts).

I have determined that the problem is not an incompatibility between the native 
library and Hadoop since a nonHadoop Java program, using the same JNI interface 
to access our native routines, exhibits the same errors.  This is some 
fundamental incompatibility between Java/JVM/JNI and our native library.  Like 
I said, the library is very complicated.  It involves extremely fancy template 
programming and I suspect JNI is simply not properly coordinating memory 
between the two environments.  I've valgrinded it, I've looked at the core 
dumps, I haven't found an underlying cause yet, much less fixed the problem.

Which brings me to streaming.  Serendipitously, our system seems to work 
considerably better if I don't use Java mappers and JNI, but rather using 
streaming to run a small native program that then coordinates the calls into 
the native library (further evidence that the problem lies somewhere in JNI's 
functionality).  I am not sure how to arrange our input data for streaming 
however.  It is binary of course, which has been a long-standing issue (I 
believe) with Hadoop streams since the assumption is that the data is 8-bit 
clean with endline record terminators.  Furthermore, I'm not really sure how I 
would indicate which files are the input for a given job since I don't know how 
to write a driver program for streaming, I only know how to use the streaming 
jar directly from the command line (so there is no driver of my choosing, I 
can't control the driver at all, its the "streaming" driver and all I can do is 
give it my mapper and reducer native programs to run.  You see my problem?  
With streaming I have no driver, just the mappers and reducers, but with a Java 
Hadoop program, I have all three and I can do useful preprocessing in the 
driver, like setting up the input paths to the subset of images comprising the 
input dataset for a given job.  This is quite perplexing for me.  How do I use 
streams effectively if I can't write my own driver?

Anyway, what I've come up with is to create a text file that lists the input 
files relevant to a job.  That one text file is the streaming input, each line 
of which is an input record.  Each line of the file is a properly formatted 
streaming key/val pair (meaning a TAB separates the key and the value).  
Interestingly, I don't need the key/value pairs per se, I only need the input 
filenames.  So basically I don't use the key very much and store the input 
files names in the value (meaning right after the TAB on each line of the input 
file).  My native program obviously receives a set of these lines from Hadoop 
and opens each file and operates on it.  To do this, the file must be 
accessible the the native code, and the only way I know to do that is for the 
file to be in the distributed cache (which is aliased to the current working 
directory of the native program), thus my previously described method of 
putting symbolic links to all the input files in the distributed cache using 
the -cacheFile flag.  That way the native program can open them when it finds 
their names in the streaming input records.

So that's what I've got going on.  If you have a better design in mind, I would 
love to hear about it.  I admit, this all feels rather contrived, but I simply 
don't know how else to do it.  The documentation is rather thin on these topics 
so I'm kind of making it up as I go along.  For example, I have not found a 
single code example anywhere demonstrating how to use the streaming API with a 
user-defined driver.  These are examples of this for Java-only Hadoop programs 
so that made it easy for me to learn how to do that, but for streaming, no such 
luck -- I'm not even sure such a thing is possible with streaming.  
Furthermore, I remain confounded how to set up or otherwise specify a subset of 
files on HDFS as input records to streaming.  I could give it the root 
directory of my entire database and it would happily feed all of the images to 
my job, but like I said, I only need a tiny subset for any one job.  My driver 
would usually handle this on a per-file basis, but I don't have one in 
streaming.  Instead, I give it a single file as streaming input which contains 
a list of relevant files in HDFS for processing a given job.

Thank you very much.

Cheers!

________________________________________________________________________________
Keith Wiley     [email protected]     keithwiley.com    music.keithwiley.com

"The easy confidence with which I know another man's religion is folly teaches
me to suspect that my own is also."
                                           --  Mark Twain
________________________________________________________________________________

Reply via email to