Regarding the race condition, hadoop builds task specific temporary
directories in the output directory, one per reduce task, that hold these
output files (as long as you don't use absolute path names).  When the
process completes successfully, the output files from that temporary
directory are moved to the correct place and the temporary task-specific
directory is deleted.  If the reduce task dies or is superceded by another
task, then the directory is simply deleted.  The file is not kept in memory
pending write.

I am curious about how to demarcate the image boundaries in your current
output.  Hadoop streaming makes the strong presumption of line orientation.
If that isn't valid for your output, then you may have a program that is
only accidentally working by finding line boundaries in binary data.  In
particular, you may someday have a situation where some of the data has one
kind of line boundary that is recognized, but on output the corresponding
boundary is generated in a different form.  For instance, if your program
sees CR-LF, it might take the pair as a line boundary and emit just LF.
Even if this is not happening now, you may be in for some trouble later.


On 1/15/08 8:57 AM, "Yuri Pradkin" <[EMAIL PROTECTED]> wrote:

> Well, in our case the reducer munches key-value pairs to
> generate images; it's conceivable that we'll have other
> reducers in the future to do other interesting things.
> So, it would be impractical to move all that code into
> a RecordWriter.  We don't want to have a new  RecordWriter
> for each new job, and we'd like to keep our processing
> code in languages other than Java, which is the only reason
> to use streaming, right?
> 
> 
> Do you think it would be a good solution to come up with a
> "generic" version of a record writer that would take as
> input, say: 
> <filename, filesize, rawbytes[filesize]>
> and do the actual writing?
> 
> Will the Hadoop guarantee that only one "filename" will be
> created/written to even if there are racing tasks and the
> file will not be corrupted?
> 
> And what about memory requirements? -- When filesize is large,
> would it have to be all stored in memory before it's written,
> or Hadoop will cache it in a temp file?
> 
> Thanks much for your input.
> 
>   -Yuri 
> 
> On Mon, Jan 14, 2008 at 01:06:13PM -0800, Runping Qi wrote:
>> 
>> One way to achieve your goal is to implement your own
>> OutputFormat/RecordWriter classes.
>> Your reducer will emit all the key/value pairs as in the normal case.
>> In your record writer class can open multiple output files and dispatch
>> the key/value to appropriate files based on the actual values.
>> This way, the Hadoop framework takes care of all the issues related the
>> namespace and the necessary cleanup of the output files.
>> 
>> 
>> Runping
>>  
>> 
>>> -----Original Message-----
>>> From: Yuri Pradkin [mailto:[EMAIL PROTECTED]
>>> Sent: Monday, January 14, 2008 12:33 PM
>>> To: hadoop-user@lucene.apache.org
>>> Subject: writing output files in hadoop streaming
>>> 
>>> Hi,
>>> 
>>> We've been using Hadoop streaming for the last 3-4 months and
>>> it all worked out fine except for one little problem:
>>> 
>>> in some situations a hadoop reduce job gets multiple key groups
>>> and is desired to write out a separate binary output file for
>>> each group.  However, when a reduce task takes too long and
>>> there is spare capacity, the task may be replicated on another
>>> node and these two are basically racing each other.  One finishes
>>> cleanly and the other is terminated.  Hadoop takes care to remove
>>> ther terminated job's output from HDFS, but since we're writing
>>> files from scripts, it's up to us to separate the output of cleanly
>>> finished tasks from the output of tasks that are terminated
>>> prematurely.
>>> 
>>> Does somebody have answers to the following questions:
>>> 1. Is there an easy way to tell in a script launched by the Hadoop
>>>    streaming, if the script was terminated before it received complete
>>>    input?
>>>    As far as I was able to ascertain, no signals are being sent to
>> those
>>>    unix-jobs.  They just stop receiving data from STDIN.  The only way
>>>    that seems to work for me was to process all input and then write
>>>    something to STDOUT/STDERR and see if that causes a SIGPIPE.  But
>>>    this is ugly, I hope there is a better solution.
>>> 
>>> 2. Is there any good way to write multiple HDFS files from a streaming
>>> script
>>>    *and have Hadoop cleanup those files* when it decides to destroy
>> the
>>>    task?  If there was just one file, I could simply use STDOUT, but
>>> dumping
>>>    multiple binary files to STDOUT is not pretty.
>>> 
>>> We are writing output files to an NFS partition shared among all
>> reducers,
>>> which
>>> makes it all slightly more complicated because of possible file
>>> overwrites.
>>> 
>>> Our current solution, which is not pretty but avoids directly
>> addressing
>>> this
>>> problem is to write out files with random names (created with mktemp)
>> and
>>> write
>>> to STDOUT the renaming command for this file to it's desired name.
>> Then
>>> as a
>>> post-processing stage, I execute all those commands and delete the
>>> remaining
>>> temporary files as duplicates/incompletes.
>>> 
>>> Thanks,
>>> 
>>>   -Yuri

Reply via email to